spring-attic / aggregator Goto Github PK
View Code? Open in Web Editor NEWThe Spring Cloud Stream Aggregator Application Starter
License: Apache License 2.0
The Spring Cloud Stream Aggregator Application Starter
License: Apache License 2.0
spring-cloud-starter-stream-processor-aggregator/src/main/resources/META-INF/spring-configuration-metadata-whitelist.properties
defines AggregatorProperties
rather than the full package path org.springframework.cloud.stream.app.aggregator.processor.AggregatorProperties
As a result the AggregatorProperties props are not whitelisted.
From @SafaArooj:
The documentation states, that by default aggregator app returns a collection of payloads and we don’t need to provide any aggregation
expression if that’s what we want. But while working with it I have observed a discrepancy. Although it does return a list but the elements of list are either object addresses or some sort of transformed byte arrays.
For example: If I provide no aggregation
expression the returned list looks something (Data sent to http using Postman)
["ew0KCSJwYXlsb2FkIjogew0KCQkiU09VUkNFX1RBQkxFIjogIklOVk9JQ0VfQUREUkVTUyIsDQoJCSJWRU5ET1JfTkJSIjogODQxMjk1MCwNCgkJIklOVk9JQ0VfSUQiOiAxMjc2NTEzMjYsDQoJCSJJTlZfQUREUl9UWVBFX0NPREUiOiAiUkUiDQoJfQ0KfQ==","ew0KCSJwYXlsb2FkIjogew0KCQkiU09VUkNFX1RBQkxFIjogIklOVk9JQ0VfQUREUkVTUyIsDQoJCSJJTlZPSUNFX0lEIjogMTI3NjUxMzI2DQoJfQ0KfQ=="]
But If I provide #this.![new String(payload)]
, the returned list is:
["{\r\n\t\"payload\": {\r\n\t\t\"SOURCE_TABLE\": \"INVOICE\",\r\n\t\t\"INVOICE_ID\": 127651326\r\n\t}\r\n}","{\r\n\t\"payload\": {\r\n\t\t\"SOURCE_TABLE\": \"INVOICE_ADDRESS\",\r\n\t\t\"INVOICE_ID\": 127651326\r\n\t}\r\n}"]
Which is a list of strings with actual message.
However if I use dataflow server shell to post messages to the http channel (when no aggregate expression is provided), it prints out the object addresses as following:
[[B@59418842, [B@76d9b1a]
Although providing the above mentioned aggregate expression resolves this issue but before that I looked into the code and modifying the DefaultAggregatingMessageGroupProcessor
worked for me as well. I changed the code to following:
for (Message<?> message : messages) {
if (message.getPayload() instanceof byte[]) {
String contentType = message.getHeaders().containsKey(MessageHeaders.CONTENT_TYPE)
? message.getHeaders().get(MessageHeaders.CONTENT_TYPE).toString()
: BindingProperties.DEFAULT_CONTENT_TYPE.toString();
if (contentType.contains("text") || contentType.contains("json") || contentType.contains("x-spring-tuple")) {
message = new MutableMessage<>(new String(((byte[]) message.getPayload())), message.getHeaders());
}
payloads.add(message.getPayload());
}
}
The above returns a list of actual messages without providing any aggregate expression as opposed to the actual aggregator app. The DSL I used with my custom app is the following:
http --server.port=8074 | aggApp --aggregator.release='size() == 2' --aggregator.correlation=#jsonPath(payload,'payload.INVOICE_ID') | log
The version I am using is “2.1.0.RELEASE”. If you believe that this is not a bug but an intended behavior of the app then can you please at least update the documentation to use `#this.![new String(payload)]' aggregate expression for the retrieval of actual messages. Because for a SpEL and a spring cloud data flow novice like me it was extremely frustrating to figure this thing out.
Logs
_Source : Data flow server shell_
`2019-06-06 17:36:22.006 INFO 18136 --- [container-0-C-1] log-sink : [[B@771453a9, [B@2e2c8d1e]
_Source : Postman_
2019-06-06 17:36:58.817 INFO 18136 --- [container-0-C-1] log-sink : ["ew0KCSJwYXlsb2FkIjogew0KCQkiU09VUkNFX1RBQkxFIjogIklOVk9JQ0VfQUREUkVTUyIsDQoJCSJJTlZPSUNFX0lEIjogMTI3NjUxMzI2DQoJfQ0KfQ==","ew0KCSJwYXlsb2FkIjogew0KCQkiU09VUkNFX1RBQkxFIjogIklOVk9JQ0UiLA0KCQkiSU5WT0lDRV9JRCI6IDEyNzY1MTMyNg0KCX0NCn0="]
_Source : Data flow server shell_
2019-06-06 17:39:25.698 INFO 6596 --- [container-0-C-1] log-sink : [{"payload":{"SOURCE_TABLE":"INVOICE","INVOICE_ID":127651326}}, {"payload":{"SOURCE_TABLE":"INVOICE_ADDRESS","INVOICE_ID":127651326}}]
_Source : Postman_
2019-06-06 17:39:37.280 INFO 6596 --- [container-0-C-1] log-sink : ["{\r\n\t\"payload\": {\r\n\t\t\"SOURCE_TABLE\": \"INVOICE_ADDRESS\",\r\n\t\t\"INVOICE_ID\": 127651326\r\n\t}\r\n}","{\r\n\t\"payload\": {\r\n\t\t\"SOURCE_TABLE\": \"INVOICE\",\r\n\t\t\"INVOICE_ID\": 127651326\r\n\t}\r\n}"]
As a user, when using the aggregator
application, though I have the flexibility to switch between message stores, I'd want the property whitelisting to be automatically filtered based on the "selected" (or defaulted) message store as opposed to showing all the options across all the supported stores.
Hi all
When setting up persistence with REDIS, I get the error "Cannot store messages without an ID header". To resolve this, I have to manually add the property "spring.cloud.stream.kafka.default.consumer.standard-headers = both" to map the ID and TIMESTAMP headers of Spring Integration in Spring Kafka. As the Spring Integration header ID is required for persistence with REDIS, could this property be added automatically by the app (I looked for a while before finding the solution!)?
The version I am using is “2.4.0.RELEASE”.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.