Unit testing Python Flask Stream -
does have experience/pointers on testing flask content streaming resource? application uses redis pub/sub, when receiving message in channel streams text 'data: {"value":42}' implementation following flask docs at: docs , unit tests done following flask docs too. messages redis pub/sub sent second resource (post).
i'm creating thread listen stream while post on main application resource publishes redis.
although connection assertions pass (i receive ok 200 , mimetype 'text/event-stream') data object empty.
my unit test this:
def test_04_receiving_events(self): headers = [('content-type', 'application/json')] data = json.dumps({"value": 42}) headers.append(('content-length', len(data))) def get_stream(): rv_stream = self.app.get('stream/data') rv_stream_object = json.loads(rv_stream.data) #error (empty) self.assertequal(rv_stream.status_code, 200) self.assertequal(rv_stream.mimetype, 'text/event-stream') self.assertequal(rv_stream_object, "data: {'value': 42}") t.stop() threads = [] t = thread(target=get_stream) threads.append(t) t.start() time.sleep(1) rv_post = self.app.post('/sensor', headers=headers, data=data) threads_done = false while not threads_done: threads_done = true t in threads: if t.is_alive(): threads_done = false time.sleep(1)
the app resource is:
@app.route('/stream/data') def stream(): def generate(): pubsub = db.pubsub() pubsub.subscribe('interesting') event in pubsub.listen(): if event['type'] == 'message': yield 'data: {"value":%s}\n\n' % event['data'] return response(stream_with_context(generate()), direct_passthrough=true, mimetype='text/event-stream')
any pointers or examples of how test content stream in flask? google seems not on one, unless i'm searching wrong keywords.
thanks in advance.
Comments
Post a Comment