hadoop - Storm-jms Spout collecting Avro messages and sending down stream? -
i new avro format. trying collect avro messages jms queue using storm-jms spout , send them hdfs using hdfs bolt.
queue sending avro not able them in avro format using hdfs bolt.
how collect avro message , send them downstream without encoding errors in hdfs.
the existing hdfs bolt not support writing avro files need overcome making following changes. in sample code using getting jms messages spout , converting jms bytes message avro , emmiting them hdfs.
this code can serve sample modifying methods in abstracthdfsbolt.
public void execute(tuple tuple) { try { long length = bytesmessage.getbodylength(); byte[] bytes = new byte[(int)length]; /////////////////////////////////////// bytesmessage.readbytes(bytes); string replymessage = new string(bytes, "utf-8"); datumreader = new specificdatumreader<indexedrecord>(schema); decoder = decoderfactory.get().binarydecoder(bytes, null); result = datumreader.read(null, decoder); synchronized (this.writelock) { datafilewriter.append(result); datafilewriter.sync(); this.offset += bytes.length; if (this.syncpolicy.mark(tuple, this.offset)) { if (this.out instanceof hdfsdataoutputstream) { ((hdfsdataoutputstream) this.out).hsync(enumset.of(syncflag.update_length)); } else { this.out.hsync(); this.out.flush(); } this.syncpolicy.reset(); } datafilewriter.flush(); } if(this.rotationpolicy.mark(tuple, this.offset)){ rotateoutputfile(); // synchronized this.offset = 0; this.rotationpolicy.reset(); } } catch (ioexception | jmsexception e) { log.warn("write/sync failed.", e); this.collector.fail(tuple); } } @override void closeoutputfile() throws ioexception { this.out.close(); } @override path createoutputfile() throws ioexception { path path = new path(this.filenameformat.getpath(), this.filenameformat.getname(this.rotation, system.currenttimemillis())); this.out = this.fs.create(path); datafilewriter.create(schema, out); return path; } @override void doprepare(map conf, topologycontext topologycontext,outputcollector collector) throws ioexception { // todo auto-generated method stub log.info("preparing hdfs bolt..."); try { schema = new schema.parser().parse(new file("/home/*******/********schemafilename.avsc")); } catch (ioexception e1) { e1.printstacktrace(); } this.fs = filesystem.get(uri.create(this.fsurl), hdfsconfig); datumwriter = new specificdatumwriter<indexedrecord>(schema); datafilewriter = new datafilewriter<indexedrecord>(datumwriter); jmsavroutils jasv = new jmsavroutils(); }
Comments
Post a Comment