File Source and Sink
To learn more about this application and the supported properties, please review the following link.
License: Apache License 2.0
To learn more about this application and the supported properties, please review the following link.
The option to delete the processed file for the file-source app in spring cloud stream is a very useful feature. sftp-source has already implemented this feature. Please consider adding this feature to file-source app as well. Apache Camel provides additional option to move files to backup folder and delete file from input directory after injecting to the flow.
It would be nice to have support for pluggable metadata store like the aggregator has today:
java -jar aggregator-rabbit-1.0.0.RELEASE
--aggregator.message-store-type=jdbc
--spring.datasource.url=jdbc:h2:mem:test
--spring.datasource.schema=org/springframework/integration/jdbc/schema-h2.sql
This would allow us to plug a metadata store that can keep tracks of files that have already been processed, avoiding reprocessing during restarts for example.
As a user, I'm trying to ingest large files using the file-source, so I'd like to refer to a sample of the solution to use it the right way.
Acceptance:
From @sabbyanandan on May 13, 2016 14:1
From @salgmachine on May 8, 2016 0:2
Hi,
I just had the following issue:
The documentation states:
"dir" - the absolute path to the directory to monitor for files
I tool a look at the class "FileSourceProperties", the variable which gets passed the directory parameter is called "directory".
To reproduce:
The following stream definition didn't work (nothing was logged):
file --mode=ref --dir=/home/kafka/test | log
The following stream definition worked (log now showing expected output):
file --mode=ref --directory=/home/kafka/test | log
I guess it's just an error in the documentation but this has some impact on people taking their first steps with the stream modules.
Copied from original issue: spring-cloud/spring-cloud-stream-modules#258
Copied from original issue: spring-cloud/spring-cloud-stream-app-starters#92
In ref
mode, leave the contentType
header unset. This would allow the message consumer to use the spring.cloud.stream.bindings.output.contentType
property to set the desired type.
For example spring.cloud.stream.bindings.output.contentType=text/plain
.
Note: that this is a work around. A permanent fix should be made in the core
's FileUtils.enhanceFlowForReadingMode
method instead.
As a user, I have a stream that includes file
source with --mode=ref
and on every new file event, I'm getting an IllegalArgumentException
. This is happening on both Avogadro-GA
(1.1.0.GA) and Avogadro-SR1
(1.1.1.GA) releases. This, however, works with 1.0.2.GA
release, though.
stream:
stream create foo --definition "file --filename-pattern=*.jpg --directory=/data --mode=ref | log"
error:
2017-01-29 16:03:24.985 DEBUG 9088 --- [ask-scheduler-1] o.s.i.e.SourcePollingChannelAdapter : Poll resulted in Message: GenericMessage [payload=/data/salsa.jpg, headers={id=2d361087-8d6e-5161-e935-30e84c6803ec, timestamp=1485734604985}]
2017-01-29 16:03:24.985 DEBUG 9088 --- [ask-scheduler-1] o.s.integration.channel.DirectChannel : preSend on channel 'output', message: GenericMessage [payload=/data/salsa.jpg, headers={id=2d361087-8d6e-5161-e935-30e84c6803ec, timestamp=1485734604985}]
2017-01-29 16:03:24.985 DEBUG 9088 --- [ask-scheduler-1] o.s.i.channel.PublishSubscribeChannel : preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.IllegalArgumentException: payload must not be null, headers={id=9900c562-5037-7551-feb7-a5ad9f31aba4, timestamp=1485734604985}]
2017-01-29 16:03:24.985 DEBUG 9088 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : _org.springframework.integration.errorLogger.handler received message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.IllegalArgumentException: payload must not be null, headers={id=9900c562-5037-7551-feb7-a5ad9f31aba4, timestamp=1485734604985}]
2017-01-29 16:03:24.986 ERROR 9088 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.IllegalArgumentException: payload must not be null
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:203)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:272)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: payload must not be null
at org.springframework.util.Assert.notNull(Assert.java:115)
at org.springframework.integration.support.MutableMessage.<init>(MutableMessage.java:57)
at org.springframework.integration.support.MutableMessage.<init>(MutableMessage.java:53)
at org.springframework.integration.support.MutableMessageBuilder.withPayload(MutableMessageBuilder.java:86)
at org.springframework.integration.support.MutableMessageBuilderFactory.withPayload(MutableMessageBuilderFactory.java:35)
at org.springframework.integration.support.MutableMessageBuilderFactory.withPayload(MutableMessageBuilderFactory.java:26)
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$ContentTypeConvertingInterceptor.preSend(MessageConverterConfigurer.java:194)
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:538)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:415)
... 23 more
I am using file-source with s3 sink, I want to use this header to upload the file to S3 with the same name.
I can see a few headers (like Content-Type, id and timestamp) but file_name in header is not found.
I was assigning the key-expression = "headers['file_name']"
but I get an error message that such header does not exist.
Can you tell me what is the issue
, as I can see in this doc https://github.com/spring-cloud-stream-app-starters/file/tree/master/spring-cloud-starter-stream-source-file
if we take mode as
mode = contents
Headers should come these:
Content-Type: application/octet-stream
file_orginalFile: <java.io.File>
file_name: <file name>
From @ljtfreitas on October 24, 2017 19:0
Hello,
In the company I work for, we are strongly considering using Spring Cloud DataFlow in our pipeline data projects. In our scenario, we need to start some streams from files in gz
format. Before we start developing our own code, is there any app for this purpose in spring-cloud-dataflow?
Thanks!
Copied from original issue: spring-cloud/spring-cloud-dataflow#1716
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.