crabhi / celery-java Goto Github PK
View Code? Open in Web Editor NEWJava implementation of Celery client and worker
License: MIT License
Java implementation of Celery client and worker
License: MIT License
Celery.builder() cannot be resolved and i am trying to implement a worker from java and client in python and CeleryWorker.create is also creating problem and atlast error is showing that TestTask.sum is not registered both on java and python side. Please help!! It's very urgent
I am trying to code the example in language Kotlin:
/*
* This Kotlin source file was generated by the Gradle 'init' task.
*/
package poc_kotlin_celery
import com.geneea.celery.*
fun main() {
val client: Celery = Celery.builder()
.brokerUri("amqp://localhost/%2F")
.backendUri("rpc://localhost/%2F")
.build();
val args = arrayOf(1, 2)
println("${client::class.simpleName}")
println("${client::class.qualifiedName}")
val result: Any = client.submit("task.add", args)
// println("${result::class.simpleName}")
// val suma: Int = result.get()
//> println(suma)
}
But I have this error when I try to run:
$ gradle run
> Task :app:compileKotlin FAILED
e: /home/miguel/code/poc_kotlin_celery/app/src/main/kotlin/poc_kotlin_celery/App.kt: (19, 29): Not enough information to infer type variable R
FAILURE: Build failed with an exception.
* What went wrong:
Execution failed for task ':app:compileKotlin'.
> Compilation error. See log for more details
* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights.
* Get more help at https://help.gradle.org
BUILD FAILED in 758ms
1 actionable task: 1 executed
I think that it may be the type of result, but I was trying with AsyncResult<Int>
and it failed too.
Is this project still being maintained?
No signature of method: static com.geneea.celery.Celery.builder() is applicable for argument types: () values: []
Possible solutions: find()
at com.geneea.celery.ClientWithBackendTest.setup
Great work on this! Here's an issue to track adding SQS support, which I'd need to start experimenting with this at my company.
Make it possible to call task with keyword arguments and dispatch it correctly in Java worker.
Currently , the library can not support sending message to a queue with priority, due to a default setting of priority to null in the code.
Pls checkout if it is fine to add such feature, which is widely used in production env
#13
It should be possible to annotate an interface without implementation to generate the proxy for Python code.
Like this:
public interface PostcodeTask {
@Task(name="postcode.inflect_city")
String inflectCity(String postCode, int wordCase);
}
You would then call a proxy class as usual:
Future<String> result = PostcodeTaskProxy.with(client).inflectCity("10100", 1);
// computation performed by Python function inflect_city in postcode.py
assertEquals("Praha", result.get());
Not all methods of a class need to be a task. Support per-method annotation.
I found a bug when testing the main function : the message is always sent to queue 'celery'.
When digging into the code, I found
public void send(String queue) throws IOException { AMQP.BasicProperties messageProperties = props.headers(headers.map).build(); channel.basicPublish("", "celery", messageProperties, body); }
the arg 'queue' is actually never used.
#11
The file velocity.properties
should be namespaced.
celery-java 在运行task时,会加载task类信息,启用TaskLoader.vm和TaskProxy.vm 在annotations下生成模板,而在加载类信息是会有一个bug,当我的task类是在java下,那么他默认是没有package信息的,所以会导致生成错误的模板
com.geneea.celery.annotationprocessor.TaskProcessor.process 有如下代码:
PackageElement packageElement = (PackageElement) taskClassElem.getEnclosingElement();
Name packageName = packageElement.getQualifiedName();
writeProxy(taskClassElem, methods, packageName);
writeLoader(taskClassElem, packageName);
这里的packageName 为null ,导致模板生成错误
It makes it easier to handle connection failures. The client creation now depends on an established connection. The client should be created at any time but the task submission will fail if it can't connect to the queue.
Example:
client = new Celery('amqp://guest:guest@somerabbit//', 'rpc://guest:guest@somerabbit//', 'celery');
// The server somerabbit doesn't run yet.
client.submit(...); // throws IOException
// Now, the server already started
client.submit(...); // succeeds without any other action taken
Exception in thread "main" java.lang.IncompatibleClassChangeError: Class com.google.common.base.Suppliers$MemoizingSupplier does not implement the requested interface java.util.function.Supplier
at com.geneea.celery.Celery.submit(Celery.java:129)
Java client should optionally accept a map of arguments. These should be passed as kwargs
.
Hello, I'm using the library with a webservice but everytime I publish a message, a connection is opened and never closed. The thing is that after a few hours the RAM gets full because of too many connections and everything crashes.
Is it possible to persist one connections and handle channels?
Thank you,
Diego
ps: sorry for my english
Given the client and worker of celery might be deployed on different machines, to let the client side record the task id is very useful when locating problems.
Currently, the client knows the task id, but it doesn`t reveal the info to the user.
I suggest that it is better to expose the task id info to the user who calls a celery client submitting
Pls consider it.
It seems that celery-java do not support the redis broker
this is my code:
` Celery client = Celery.builder()
.brokerUri("redis://:M2EzYmNiYzc3Y2IyZWFiNzRhMjkxOGU4ODJiY2YyOTJhOWVmMTQ5YjEwOGZkODFl@10.19.0.11:6379/3")
// .backendUri("rpc://:M2EzYmNiYzc3Y2IyZWFiNzRhMjkxOGU4ODJiY2YyOTJhOWVmMTQ5YjEwOGZkODFl@10.19.0.11:6379/3")
.build();
try {
System.out.println(client.submit("tasks.add", new Object[]{1, 2}).get());
// client.submit(TestTask.class, "sum", new Object[]{"a", "b"}).get();
} finally {
}`
this is the errMsg:
Exception in thread "main" com.geneea.celery.UnsupportedProtocolException: Unsupported protocol: redis. Supported protocols are: amqp, amqps at com.geneea.celery.brokers.CeleryBrokers.createBroker(CeleryBrokers.java:46) at com.geneea.celery.Celery.lambda$new$0(Celery.java:64) at com.google.common.base.Suppliers$NonSerializableMemoizingSupplier.get(Suppliers.java:160) at com.geneea.celery.Celery.submit(Celery.java:144) at com.caih.api.admin.utils.celery.CeleryUtils.main(CeleryUtils.java:30)
When creating the queue I get the error:
java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:968)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333)
at com.geneea.celery.CeleryWorker.create(CeleryWorker.java:187)
at com.geneea.celery.CeleryWorker.main(CeleryWorker.java:216)
at com.fabrikado.materialise_wrapper.MaterialiseApp.main(MaterialiseApp.java:60)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'celery_materialise' in vhost 'v_host': received 'true' but current is 'false', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
... 5 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'celery_materialise' in vhost 'v_host': received 'true' but current is 'false', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739)
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
at java.base/java.lang.Thread.run(Thread.java:844)
since the value is hardcoded, in the function create
of CeleryWorker.java
it cannot be changed.
I calling python task form java, and the java app need get task result.
but i see each message is unack in rabbitmq. and call about 5times,
rabbitmq will disconnet the java app.
Channel error on connection <0.13121.2> ([::1]:39192 -> [::1]:5672, vhost: '/', user: 'admin'), channel 1: operation none caused a channel exception precondition_failed: delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more
how to resolve this? thanks
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.