how rxjava work inside nifi?? or how nifi , rxjava play nice? seem such perfect complement 1 another.
i've run problem can't figure out how solve. nifi keeps complaining illegalstateexception
or flowfilehandlingexception
depending on approach take on , how read flowfile input stream.
i'm learning apache nifi , rxjava 2 (i.e. flowables). want create apache nifi processor operates similar existing splittext processor - simpler. no header processing, no fragment size processing -- pull out each line of data -- call splitline.
there no fancy threading going on here -- meaning i'm not trying flowable.observeon() or flowable.subscribeon(). should done on 1 thread...the current thread.
i thought solve using rxjava. read characters flowfile , publish them using shifted buffer; example...
flowable<tuple<long, integer>> chars = flowable.generate( () -> 0l, (cnt, emitter) -> { int ch = flowstream.read(); emitter.onnext(new tuple<>(cnt, ch); if (ch == -1) emitter.oncomplete(); return cnt++; }); return chars.buffer(2, 1).publish().autoconnect();
i tried equivalent using flowable.create...
flowable<tuple<long, integer>> chars = flowable.create(emitter -> { try { int ch; long cnt = 0; while ((ch = flowstream.read()) != -1) { emitter.onnext(new tuple<>(cnt, ch); cnt++; } emitter.oncomplete(); } catch (ioexception ex) { ex.printstacktrace(); emitter.onerror(ex); } { flowstream.close(); } }, backpressurestrategy.buffer); return chars.buffer(2, 1).publish().autoconnect();
in cases above, passing in inputstream
nifi processsession
in overridden ontrigger
method of processor class.
inputstream stream = session.read(flowfile) rxlinesplitter splitter = new rxlinesplitter(stream);
i have tried using callback version unsurprisingly received exception because the stream accessed callback other read callback. is...
session.read(flowfile, stream -> { rxlinesplitter splitter = new rxlinesplitter(stream); // rxlinesplitter contains code above other callback complaining about... }
why publishing char stream? , why in pairs of chars? have 2 subscribers on char stream. 1 looks start of line, other looks end of line. because of windows need 1 of [\r; \n; or \r\n)]. basically, second char in pair lookahead.
in case you're interested, crux of rxsplitline looks like...
flowable<tuple<long, integer>> findlinemarkers( flowable<list<tuple<long, integer>>> charpairs, bifunction<tuple<long, integer>, optional<tuple<long, integer>>, optional<tuple<long, integer>>> strategy) { return charpairs().map(pair -> { tuple<long, integer> fst = pair.get(0); optional<tuple<long, integer>> snd = pair.size() > 1 ? optional.of(pair.get(1)) : optional.empty(); return strategy.apply(fst, snd); }).filter(optional::ispresent).map(optional::get); } flowable<splitinfo> split(inputstream stream) throws ioexception { return findlinemarkers(stream, startingpositionstrategy) .zipwith(findlinemarkers(stream, endingpositionstrategy), (s, e) -> new split(s.item1, e.item1 - s.item1)) .filter(split -> !removeemptylines || split.length > 0) .zipwith(counter(), tuple::new) .timeinterval(timeunit.milliseconds) .map(x -> new splitinfo(x.value().item1.start, x.value().item1.length, x.value().item2, x.time(), x.unit())); }
enough rambling... i'd grateful or pointers in getting nifi , rxjava 2 play nice 1 another.
i believe have found answer... @ least splitline processor shows has received flow file , read bytes accurate too!
if plan read or input stream outside normal inputstreamcallback
, nifi docs instruct use 1 of other overloads on processsession.read
inputstream input = session.read(flowfile)
. docs state you responsible closing stream. trying well, might add on... close stream , eagerly possible.
in rxjava2 means flowable.create
method close not enough. need wrap flowable.using
around flowable.create
. below modified constructor , method worked...
a couple of highlights note:
you might tempted pass
processsession
around , useresourcesupplier
inflowable.using
... caused numerous headaches me, ymmv, don't recommend (but if find way, please let me know).i made use of
flowable.using
overload allows specifyeager
parameter. set mine true eagerly close/dispose resource (the inputstream).
rxlinesplitter(inputstream input, boolean removeemptylines) { this.inputstream = input; this.removeemptylines = removeemptylines; } private flowable<list<tuple<long, integer>>> getcharacters() { flowable<tuple<long, integer>> chars = flowable.using( () -> this.inputstream, input -> flowable.create(emitter -> { try { long cnt = 0; while (true) { int ch = input.read(); if (iseof.test(ch)) break; emitter.onnext(new tuple<>(cnt, ch)); ++cnt; } emitter.oncomplete(); } catch (exception ex) { emitter.onerror(ex); } }, backpressurestrategy.buffer), inputstream::close, true); return chars.buffer(2, 1); }
final thoughts:
i supporting rxlinesplitter class has no dependencies on nifi. reduces coupling.
i don't nifi processor.ontrigger method gets inputstream, requires rxlinesplitter close & dispose. discussed bit in documentation, feels dirty , error prone me. mitigating above, inputstream used in 1 method , cleaned in pretty obvious , clear way
flowable.using
.
hope helps else... time see other [learning] hurdles encounter nifi , rx.
Comments
Post a Comment