Unit testing Python Flask Stream -


does have experience/pointers on testing flask content streaming resource? application uses redis pub/sub, when receiving message in channel streams text 'data: {"value":42}' implementation following flask docs at: docs , unit tests done following flask docs too. messages redis pub/sub sent second resource (post).

i'm creating thread listen stream while post on main application resource publishes redis.

although connection assertions pass (i receive ok 200 , mimetype 'text/event-stream') data object empty.

my unit test this:

def test_04_receiving_events(self):     headers = [('content-type', 'application/json')]     data = json.dumps({"value": 42})     headers.append(('content-length', len(data)))      def get_stream():         rv_stream = self.app.get('stream/data')         rv_stream_object = json.loads(rv_stream.data) #error (empty)         self.assertequal(rv_stream.status_code, 200)         self.assertequal(rv_stream.mimetype, 'text/event-stream')         self.assertequal(rv_stream_object, "data: {'value': 42}")         t.stop()      threads = []     t = thread(target=get_stream)     threads.append(t)     t.start()     time.sleep(1)     rv_post = self.app.post('/sensor', headers=headers, data=data)      threads_done = false     while not threads_done:         threads_done = true         t in threads:             if t.is_alive():                 threads_done = false                 time.sleep(1) 

the app resource is:

@app.route('/stream/data') def stream():     def generate():         pubsub = db.pubsub()         pubsub.subscribe('interesting')          event in pubsub.listen():              if event['type'] == 'message':                 yield 'data: {"value":%s}\n\n' % event['data']     return response(stream_with_context(generate()),                 direct_passthrough=true,                 mimetype='text/event-stream') 

any pointers or examples of how test content stream in flask? google seems not on one, unless i'm searching wrong keywords.

thanks in advance.


Comments

Popular posts from this blog

twig - Using Twigbridge in a Laravel 5.1 Package -

firemonkey - How do I make a beep sound in Android using Delphi and the API? -

jdbc - Not able to establish database connection in eclipse -