spring integration - SFTP inbound adapter and transaction synchronization -
i using sftp inbound adapter custom filter. implement is, if filter accept file let move through message flow file delivered spring batch job using "batch-integration:job-launching-gateway" working fine. problem is, if filter rejects want move dir "/failed/" if during batch processing if jdbc transaction fails want move "/failed/" dir handle them later. how can synchronize jdbc transaction scenario in spring integration.
i read doc(http://docs.spring.io/spring-integration/reference/html/transactions.html) in spring integration not clear about, sample app perfect otherwise hints ok.
configuration
<context:component-scan base-package="com.sftp.test" /> <import resource="beans-context.xml" /> <!-- default poller --> <int:poller default="true" fixed-delay="5000"/> <!-- sftp inbound poller --> <int-sftp:inbound-channel-adapter id="sftpinbondadapter" channel="filemessagechannel" session-factory="sftpsessionfactory" local-directory="${test.sftp.local.dir}" remote-directory="${test.sftp.remote.dir}" remote-file-separator="/" auto-create-local-directory="true" delete-remote-files="true" filename-pattern="*" local-filter="testfilefilter" preserve-timestamp="true" temporary-file-suffix=".writing" > <int:poller cron="${test.file.poll.frequency}" max-messages-per-poll="1"> <int:transactional transaction-manager="transactionmanager" synchronization-factory="syncfactory" /> </int:poller> </int-sftp:inbound-channel-adapter> <int:transaction-synchronization-factory id="syncfactory"> <int:after-commit expression="payload.renameto('/tmp/dds/test/success/' + payload.name)" channel="committedchannel" /> <int:after-rollback expression="payload.renameto('/tmp/dds/test/failed/' + payload.name)" channel="rolledbackchannel" /> </int:transaction-synchronization-factory> <int:channel id="committedchannel"> <int:queue /> </int:channel> <int:logging-channel-adapter channel="committedchannel" /> <int:channel id="rolledbackchannel"> <int:queue /> </int:channel> <int:logging-channel-adapter channel="rolledbackchannel" /> <!-- channel message file name dropped inbound sftp channel adapter --> <int:channel id="filemessagechannel"> <int:queue /> </int:channel> <!-- transform spring integration message job launch request --> <int:transformer input-channel="filemessagechannel" output-channel="jobrequestchannel" id="joblaunchmessagetransformer"> <bean class="com.sftp.test.test.util.filemessagetojobrequest"> <property name="job" ref="testjob" /> <property name="fileparametername" value="filename" /> </bean> </int:transformer> <!-- job request channel --> <int:channel id="jobrequestchannel"> <int:queue /> </int:channel> <!-- joblaunchinggateway used launch batch jobs. internally delegates joblaunchingmessagehandler. --> <batch-integration:job-launching-gateway request-channel="jobrequestchannel" reply-channel="joblaunchreplychannel" /> <!-- job response channel --> <int:channel id="joblaunchreplychannel"> <int:queue /> </int:channel> <!-- logging response received job on joblaunchreplychannel --> <int:outbound-channel-adapter channel="joblaunchreplychannel" ref="fileprocessadapter" method="movefile"/>
the problem using queuechannel
s. hand off request thread, poller transaction "commit".
you need run job on poller thread (remove <queue/>
elements channels). add task executor poller if want run multiple jobs concurrently.
also, in order reject , rename file, can't use internal filter because filtered files don't produce message. either rename within filter, or use acceptallfilelistfilter
in adapter , use <filter/>
in flow (configured throw exception on rejection).
Comments
Post a Comment