how rxjava work inside nifi?? or how nifi , rxjava play nice? seem such perfect complement 1 another.
i've run problem can't figure out how solve. nifi keeps complaining illegalstateexception or flowfilehandlingexception depending on approach take on , how read flowfile input stream.
i'm learning apache nifi , rxjava 2 (i.e. flowables). want create apache nifi processor operates similar existing splittext processor - simpler. no header processing, no fragment size processing -- pull out each line of data -- call splitline.
there no fancy threading going on here -- meaning i'm not trying flowable.observeon() or flowable.subscribeon(). should done on 1 thread...the current thread.
i thought solve using rxjava. read characters flowfile , publish them using shifted buffer; example...
flowable<tuple<long, integer>> chars = flowable.generate( () -> 0l, (cnt, emitter) -> { int ch = flowstream.read(); emitter.onnext(new tuple<>(cnt, ch); if (ch == -1) emitter.oncomplete(); return cnt++; }); return chars.buffer(2, 1).publish().autoconnect(); i tried equivalent using flowable.create...
flowable<tuple<long, integer>> chars = flowable.create(emitter -> { try { int ch; long cnt = 0; while ((ch = flowstream.read()) != -1) { emitter.onnext(new tuple<>(cnt, ch); cnt++; } emitter.oncomplete(); } catch (ioexception ex) { ex.printstacktrace(); emitter.onerror(ex); } { flowstream.close(); } }, backpressurestrategy.buffer); return chars.buffer(2, 1).publish().autoconnect(); in cases above, passing in inputstream nifi processsession in overridden ontrigger method of processor class.
inputstream stream = session.read(flowfile) rxlinesplitter splitter = new rxlinesplitter(stream); i have tried using callback version unsurprisingly received exception because the stream accessed callback other read callback. is...
session.read(flowfile, stream -> { rxlinesplitter splitter = new rxlinesplitter(stream); // rxlinesplitter contains code above other callback complaining about... } why publishing char stream? , why in pairs of chars? have 2 subscribers on char stream. 1 looks start of line, other looks end of line. because of windows need 1 of [\r; \n; or \r\n)]. basically, second char in pair lookahead.
in case you're interested, crux of rxsplitline looks like...
flowable<tuple<long, integer>> findlinemarkers( flowable<list<tuple<long, integer>>> charpairs, bifunction<tuple<long, integer>, optional<tuple<long, integer>>, optional<tuple<long, integer>>> strategy) { return charpairs().map(pair -> { tuple<long, integer> fst = pair.get(0); optional<tuple<long, integer>> snd = pair.size() > 1 ? optional.of(pair.get(1)) : optional.empty(); return strategy.apply(fst, snd); }).filter(optional::ispresent).map(optional::get); } flowable<splitinfo> split(inputstream stream) throws ioexception { return findlinemarkers(stream, startingpositionstrategy) .zipwith(findlinemarkers(stream, endingpositionstrategy), (s, e) -> new split(s.item1, e.item1 - s.item1)) .filter(split -> !removeemptylines || split.length > 0) .zipwith(counter(), tuple::new) .timeinterval(timeunit.milliseconds) .map(x -> new splitinfo(x.value().item1.start, x.value().item1.length, x.value().item2, x.time(), x.unit())); } enough rambling... i'd grateful or pointers in getting nifi , rxjava 2 play nice 1 another.
i believe have found answer... @ least splitline processor shows has received flow file , read bytes accurate too!
if plan read or input stream outside normal inputstreamcallback, nifi docs instruct use 1 of other overloads on processsession.read inputstream input = session.read(flowfile). docs state you responsible closing stream. trying well, might add on... close stream , eagerly possible.
in rxjava2 means flowable.create method close not enough. need wrap flowable.using around flowable.create. below modified constructor , method worked...
a couple of highlights note:
you might tempted pass
processsessionaround , useresourcesupplierinflowable.using... caused numerous headaches me, ymmv, don't recommend (but if find way, please let me know).i made use of
flowable.usingoverload allows specifyeagerparameter. set mine true eagerly close/dispose resource (the inputstream).
rxlinesplitter(inputstream input, boolean removeemptylines) { this.inputstream = input; this.removeemptylines = removeemptylines; } private flowable<list<tuple<long, integer>>> getcharacters() { flowable<tuple<long, integer>> chars = flowable.using( () -> this.inputstream, input -> flowable.create(emitter -> { try { long cnt = 0; while (true) { int ch = input.read(); if (iseof.test(ch)) break; emitter.onnext(new tuple<>(cnt, ch)); ++cnt; } emitter.oncomplete(); } catch (exception ex) { emitter.onerror(ex); } }, backpressurestrategy.buffer), inputstream::close, true); return chars.buffer(2, 1); } final thoughts:
i supporting rxlinesplitter class has no dependencies on nifi. reduces coupling.
i don't nifi processor.ontrigger method gets inputstream, requires rxlinesplitter close & dispose. discussed bit in documentation, feels dirty , error prone me. mitigating above, inputstream used in 1 method , cleaned in pretty obvious , clear way
flowable.using.
hope helps else... time see other [learning] hurdles encounter nifi , rx.
Comments
Post a Comment