spring integration - SFTP inbound adapter and transaction synchronization -


i using sftp inbound adapter custom filter. implement is, if filter accept file let move through message flow file delivered spring batch job using "batch-integration:job-launching-gateway" working fine. problem is, if filter rejects want move dir "/failed/" if during batch processing if jdbc transaction fails want move "/failed/" dir handle them later. how can synchronize jdbc transaction scenario in spring integration.

i read doc(http://docs.spring.io/spring-integration/reference/html/transactions.html) in spring integration not clear about, sample app perfect otherwise hints ok.

configuration

<context:component-scan base-package="com.sftp.test" /> <import resource="beans-context.xml" />  <!-- default poller --> <int:poller default="true" fixed-delay="5000"/>   <!-- sftp inbound poller --> <int-sftp:inbound-channel-adapter id="sftpinbondadapter"     channel="filemessagechannel" session-factory="sftpsessionfactory"     local-directory="${test.sftp.local.dir}" remote-directory="${test.sftp.remote.dir}"     remote-file-separator="/" auto-create-local-directory="true"     delete-remote-files="true" filename-pattern="*" local-filter="testfilefilter"     preserve-timestamp="true" temporary-file-suffix=".writing"     >      <int:poller cron="${test.file.poll.frequency}"         max-messages-per-poll="1">         <int:transactional transaction-manager="transactionmanager"             synchronization-factory="syncfactory" />      </int:poller> </int-sftp:inbound-channel-adapter>  <int:transaction-synchronization-factory     id="syncfactory">     <int:after-commit expression="payload.renameto('/tmp/dds/test/success/' + payload.name)"         channel="committedchannel" />     <int:after-rollback expression="payload.renameto('/tmp/dds/test/failed/' + payload.name)"         channel="rolledbackchannel" /> </int:transaction-synchronization-factory>  <int:channel id="committedchannel">     <int:queue /> </int:channel>  <int:logging-channel-adapter channel="committedchannel" />   <int:channel id="rolledbackchannel">     <int:queue /> </int:channel>  <int:logging-channel-adapter channel="rolledbackchannel" />   <!-- channel message file name dropped inbound sftp      channel adapter --> <int:channel id="filemessagechannel">     <int:queue /> </int:channel>  <!-- transform spring integration message job launch request --> <int:transformer input-channel="filemessagechannel"     output-channel="jobrequestchannel" id="joblaunchmessagetransformer">     <bean class="com.sftp.test.test.util.filemessagetojobrequest">         <property name="job" ref="testjob" />         <property name="fileparametername" value="filename" />     </bean> </int:transformer>  <!-- job request channel --> <int:channel id="jobrequestchannel">     <int:queue /> </int:channel>  <!-- joblaunchinggateway used launch batch jobs. internally      delegates joblaunchingmessagehandler. --> <batch-integration:job-launching-gateway     request-channel="jobrequestchannel" reply-channel="joblaunchreplychannel" />  <!-- job response channel --> <int:channel id="joblaunchreplychannel">     <int:queue /> </int:channel>  <!-- logging response received job on joblaunchreplychannel -->  <int:outbound-channel-adapter channel="joblaunchreplychannel"   ref="fileprocessadapter"    method="movefile"/> 

the problem using queuechannels. hand off request thread, poller transaction "commit".

you need run job on poller thread (remove <queue/> elements channels). add task executor poller if want run multiple jobs concurrently.

also, in order reject , rename file, can't use internal filter because filtered files don't produce message. either rename within filter, or use acceptallfilelistfilter in adapter , use <filter/> in flow (configured throw exception on rejection).


Comments

Popular posts from this blog

twig - Using Twigbridge in a Laravel 5.1 Package -

jdbc - Not able to establish database connection in eclipse -

firemonkey - How do I make a beep sound in Android using Delphi and the API? -