c# - Blocked on block design for data flow -
we have data processing pipeline we're trying use tpl dataflow framework for.
basic gist of pipeline:
- iterate through csv files on filesystem (10,000)
- verify haven't imported contents, if have ignore
- iterate through contents of single csv file (20,000-120,000 rows) , create datastructure fits our needs.
- batch 100 of these new datastructured items , push them database
- mark csv file being imported.
now have existing python file above in slow & painful way - code mess.
my thinking following looking @ tpl dataflow.
bufferblock<string>post files intotransformblock<string, sensordatadto>predicate detect whether import filetransformblock<string, sensordatadto>reads through csv file , createssensordatadtostructurebatchblock<sensordatadto>used withintransformblockdelegate batch 100 requests.4.5.
actionblock<sensordatadto>push 100 records database.actionblockmark csv imported.
i've created first few operations , they're working (bufferblock -> transformblock + predicate && process if hasn't) i'm unsure how continue flow can post 100 batchblock within transformblock , wire following actions.
does right - basic gist, , how tackle bufferblock bits in tpl data flowy way?
bufferblock.linkto(readcsvfile, shouldimportfile) bufferblock.linkto(dataflowblock.nulltarget<string>()) readcsvfile.linkto(normalisedata) normalisedata.linkto(updatecsvimport) updatecsvimport.linkto(completionblock) batchblock.linkto(insertsensordatablock) bufferblock.completion.continuewith(t => readcsvfile.complete()); readcsvfile.completion.continuewith(t => normalisedata.complete()); normalisedata.completion.continuewith(t => updatecsvimport.complete()); updatecsvimport.completion.continuewith(t => completionblock.complete()); batchblock.completion.continuewith(t => insertsensordatablock.complete()); inside normalisedata method i'm calling batchblock.post<..>(...), pattern or should structured differently? problem can mark file being imported after records have been pushed through.
task.whenall(bufferblock.completion, batchblock.completion).wait(); if have batch of 100, if 80 pushed in, there way drain last 80?
i wasn't sure if should link batchblock in main pipeline, wait till both finished.
first of all, don't need use completion in matter, can use propagatecompletion property during link:
// predicate bufferblock.linkto(readcsvfile, new dataflowlinkoptions { propagatecompletion = true }, shouldimportfile); // without predicate readcsvfile.linkto(normalisedata, new dataflowlinkoptions { propagatecompletion = true }); now, problem batches. maybe, can use joinblock<t1, t2> or batchedjoinblock<t1, t2> here, attaching them pipeline , gathering results of joins, got full picture of work being done. maybe can implement own itargetblock<tinput> can consume messages in way.
according official docs, blocks greedy, , gather data linked 1 becomes available, join blocks may stuck, if 1 target ready , other not, or batch block has 80% of batch size, need put in mind. in case of own implementation can use itargetblock<tinput>.offermessage method information sources.
batchblock<t>capable of executing in both greedy , non-greedy modes. in default greedy mode, messages offered block number of sources accepted , buffered converted batches.in non-greedy mode, messages postponed sources until enough sources have offered messages block create batch. thus,
batchblock<t>can used receive1element each ofnsources,nelements1source, , myriad of options in between.
Comments
Post a Comment