rx java - How can I use RxJava in my Apache NiFi onTrigger method? -


how rxjava work inside nifi?? or how nifi , rxjava play nice? seem such perfect complement 1 another.

i've run problem can't figure out how solve. nifi keeps complaining illegalstateexception or flowfilehandlingexception depending on approach take on , how read flowfile input stream.

i'm learning apache nifi , rxjava 2 (i.e. flowables). want create apache nifi processor operates similar existing splittext processor - simpler. no header processing, no fragment size processing -- pull out each line of data -- call splitline.

there no fancy threading going on here -- meaning i'm not trying flowable.observeon() or flowable.subscribeon(). should done on 1 thread...the current thread.

i thought solve using rxjava. read characters flowfile , publish them using shifted buffer; example...

flowable<tuple<long, integer>> chars =      flowable.generate(         () -> 0l,         (cnt, emitter) -> {              int ch = flowstream.read();             emitter.onnext(new tuple<>(cnt, ch);             if (ch == -1) emitter.oncomplete();             return cnt++;         });   return chars.buffer(2, 1).publish().autoconnect(); 

i tried equivalent using flowable.create...

flowable<tuple<long, integer>> chars =      flowable.create(emitter -> {          try {             int ch;             long cnt = 0;             while ((ch = flowstream.read()) != -1) {                 emitter.onnext(new tuple<>(cnt, ch);                 cnt++;             }             emitter.oncomplete();         } catch (ioexception ex) {              ex.printstacktrace();             emitter.onerror(ex);         } {              flowstream.close();         }     }, backpressurestrategy.buffer);  return chars.buffer(2, 1).publish().autoconnect(); 

in cases above, passing in inputstream nifi processsession in overridden ontrigger method of processor class.

inputstream stream = session.read(flowfile) rxlinesplitter splitter = new rxlinesplitter(stream); 

i have tried using callback version unsurprisingly received exception because the stream accessed callback other read callback. is...

session.read(flowfile, stream -> {      rxlinesplitter splitter = new rxlinesplitter(stream);      // rxlinesplitter contains code above other callback complaining about...  } 

why publishing char stream? , why in pairs of chars? have 2 subscribers on char stream. 1 looks start of line, other looks end of line. because of windows need 1 of [\r; \n; or \r\n)]. basically, second char in pair lookahead.

in case you're interested, crux of rxsplitline looks like...

flowable<tuple<long, integer>> findlinemarkers(     flowable<list<tuple<long, integer>>> charpairs,      bifunction<tuple<long, integer>, optional<tuple<long, integer>>, optional<tuple<long, integer>>> strategy) {       return charpairs().map(pair -> {             tuple<long, integer> fst = pair.get(0);             optional<tuple<long, integer>> snd = pair.size() > 1 ? optional.of(pair.get(1)) : optional.empty();              return strategy.apply(fst, snd);     }).filter(optional::ispresent).map(optional::get); }  flowable<splitinfo> split(inputstream stream) throws ioexception {      return findlinemarkers(stream, startingpositionstrategy)                .zipwith(findlinemarkers(stream, endingpositionstrategy),                          (s, e) -> new split(s.item1, e.item1 - s.item1))                .filter(split -> !removeemptylines || split.length > 0)                .zipwith(counter(), tuple::new)                .timeinterval(timeunit.milliseconds)                .map(x -> new splitinfo(x.value().item1.start,                                        x.value().item1.length,                                         x.value().item2,                                        x.time(), x.unit())); } 

enough rambling... i'd grateful or pointers in getting nifi , rxjava 2 play nice 1 another.

i believe have found answer... @ least splitline processor shows has received flow file , read bytes accurate too!

if plan read or input stream outside normal inputstreamcallback, nifi docs instruct use 1 of other overloads on processsession.read inputstream input = session.read(flowfile). docs state you responsible closing stream. trying well, might add on... close stream , eagerly possible.

in rxjava2 means flowable.create method close not enough. need wrap flowable.using around flowable.create. below modified constructor , method worked...

a couple of highlights note:

you might tempted pass processsession around , use resourcesupplier in flowable.using... caused numerous headaches me, ymmv, don't recommend (but if find way, please let me know).

i made use of flowable.using overload allows specify eager parameter. set mine true eagerly close/dispose resource (the inputstream).

rxlinesplitter(inputstream input, boolean removeemptylines) {      this.inputstream = input;     this.removeemptylines = removeemptylines; }  private flowable<list<tuple<long, integer>>> getcharacters() {      flowable<tuple<long, integer>> chars =         flowable.using(             () -> this.inputstream,             input -> flowable.create(emitter -> {                  try {                     long cnt = 0;                     while (true) {                         int ch = input.read();                         if (iseof.test(ch)) break;                         emitter.onnext(new tuple<>(cnt, ch));                         ++cnt;                     }                     emitter.oncomplete();                  } catch (exception ex) {                     emitter.onerror(ex);                 }              }, backpressurestrategy.buffer),             inputstream::close,             true);      return chars.buffer(2, 1); } 

final thoughts:

  • i supporting rxlinesplitter class has no dependencies on nifi. reduces coupling.

  • i don't nifi processor.ontrigger method gets inputstream, requires rxlinesplitter close & dispose. discussed bit in documentation, feels dirty , error prone me. mitigating above, inputstream used in 1 method , cleaned in pretty obvious , clear way flowable.using.

hope helps else... time see other [learning] hurdles encounter nifi , rx.


Comments