hadoop - Storm-jms Spout collecting Avro messages and sending down stream? -


i new avro format. trying collect avro messages jms queue using storm-jms spout , send them hdfs using hdfs bolt.

queue sending avro not able them in avro format using hdfs bolt.

how collect avro message , send them downstream without encoding errors in hdfs.

the existing hdfs bolt not support writing avro files need overcome making following changes. in sample code using getting jms messages spout , converting jms bytes message avro , emmiting them hdfs.

this code can serve sample modifying methods in abstracthdfsbolt.

public void execute(tuple tuple) {                   try {                            long length = bytesmessage.getbodylength();             byte[] bytes = new byte[(int)length];             ///////////////////////////////////////             bytesmessage.readbytes(bytes);             string replymessage = new string(bytes, "utf-8");              datumreader = new specificdatumreader<indexedrecord>(schema);             decoder = decoderfactory.get().binarydecoder(bytes, null);              result = datumreader.read(null, decoder);                                            synchronized (this.writelock) {                                  datafilewriter.append(result);                                                       datafilewriter.sync();                 this.offset += bytes.length;                                    if (this.syncpolicy.mark(tuple, this.offset)) {                    if (this.out instanceof hdfsdataoutputstream) {                         ((hdfsdataoutputstream) this.out).hsync(enumset.of(syncflag.update_length));                     } else {                         this.out.hsync();                         this.out.flush();                     }                     this.syncpolicy.reset();                 }                datafilewriter.flush();             }              if(this.rotationpolicy.mark(tuple, this.offset)){                 rotateoutputfile(); // synchronized                 this.offset = 0;                 this.rotationpolicy.reset();             }         } catch (ioexception | jmsexception e) {             log.warn("write/sync failed.", e);             this.collector.fail(tuple);         }      }  @override void closeoutputfile() throws ioexception {     this.out.close(); }  @override path createoutputfile() throws ioexception {     path path = new path(this.filenameformat.getpath(), this.filenameformat.getname(this.rotation, system.currenttimemillis()));     this.out = this.fs.create(path);     datafilewriter.create(schema, out);     return path; }  @override void doprepare(map conf, topologycontext topologycontext,outputcollector collector) throws ioexception {     // todo auto-generated method stub      log.info("preparing hdfs bolt...");      try {              schema = new schema.parser().parse(new file("/home/*******/********schemafilename.avsc"));         } catch (ioexception e1) {                           e1.printstacktrace();         }      this.fs = filesystem.get(uri.create(this.fsurl), hdfsconfig);      datumwriter = new specificdatumwriter<indexedrecord>(schema);      datafilewriter = new datafilewriter<indexedrecord>(datumwriter);      jmsavroutils jasv = new jmsavroutils();          } 

Comments

Popular posts from this blog

twig - Using Twigbridge in a Laravel 5.1 Package -

jdbc - Not able to establish database connection in eclipse -

firemonkey - How do I make a beep sound in Android using Delphi and the API? -