c# - Blocked on block design for data flow -
we have data processing pipeline we're trying use tpl dataflow
framework for.
basic gist of pipeline:
- iterate through csv files on filesystem (10,000)
- verify haven't imported contents, if have ignore
- iterate through contents of single csv file (20,000-120,000 rows) , create datastructure fits our needs.
- batch 100 of these new datastructured items , push them database
- mark csv file being imported.
now have existing python file above in slow & painful way - code mess.
my thinking following looking @ tpl dataflow
.
bufferblock<string>
post files intotransformblock<string, sensordatadto>
predicate detect whether import filetransformblock<string, sensordatadto>
reads through csv file , createssensordatadto
structurebatchblock<sensordatadto>
used withintransformblock
delegate batch 100 requests.4.5.
actionblock<sensordatadto>
push 100 records database.actionblock
mark csv imported.
i've created first few operations , they're working (bufferblock
-> transformblock
+ predicate
&& process if hasn't) i'm unsure how continue flow can post 100 batchblock
within transformblock
, wire following actions.
does right - basic gist, , how tackle bufferblock
bits in tpl data flowy way?
bufferblock.linkto(readcsvfile, shouldimportfile) bufferblock.linkto(dataflowblock.nulltarget<string>()) readcsvfile.linkto(normalisedata) normalisedata.linkto(updatecsvimport) updatecsvimport.linkto(completionblock) batchblock.linkto(insertsensordatablock) bufferblock.completion.continuewith(t => readcsvfile.complete()); readcsvfile.completion.continuewith(t => normalisedata.complete()); normalisedata.completion.continuewith(t => updatecsvimport.complete()); updatecsvimport.completion.continuewith(t => completionblock.complete()); batchblock.completion.continuewith(t => insertsensordatablock.complete());
inside normalisedata
method i'm calling batchblock.post<..>(...)
, pattern or should structured differently? problem can mark file being imported after records have been pushed through.
task.whenall(bufferblock.completion, batchblock.completion).wait();
if have batch of 100
, if 80
pushed in, there way drain last 80
?
i wasn't sure if should link batchblock
in main pipeline, wait till both finished.
first of all, don't need use completion
in matter, can use propagatecompletion
property during link:
// predicate bufferblock.linkto(readcsvfile, new dataflowlinkoptions { propagatecompletion = true }, shouldimportfile); // without predicate readcsvfile.linkto(normalisedata, new dataflowlinkoptions { propagatecompletion = true });
now, problem batches. maybe, can use joinblock<t1, t2>
or batchedjoinblock<t1, t2>
here, attaching them pipeline , gathering results of joins, got full picture of work being done. maybe can implement own itargetblock<tinput>
can consume messages in way.
according official docs, blocks greedy, , gather data linked 1 becomes available, join blocks may stuck, if 1 target ready , other not, or batch block has 80%
of batch size, need put in mind. in case of own implementation can use itargetblock<tinput>.offermessage
method information sources.
batchblock<t>
capable of executing in both greedy , non-greedy modes. in default greedy mode, messages offered block number of sources accepted , buffered converted batches.in non-greedy mode, messages postponed sources until enough sources have offered messages block create batch. thus,
batchblock<t>
can used receive1
element each ofn
sources,n
elements1
source, , myriad of options in between.
Comments
Post a Comment