c# - Blocked on block design for data flow -


we have data processing pipeline we're trying use tpl dataflow framework for.

basic gist of pipeline:

  1. iterate through csv files on filesystem (10,000)
  2. verify haven't imported contents, if have ignore
  3. iterate through contents of single csv file (20,000-120,000 rows) , create datastructure fits our needs.
  4. batch 100 of these new datastructured items , push them database
  5. mark csv file being imported.

now have existing python file above in slow & painful way - code mess.

my thinking following looking @ tpl dataflow.

  1. bufferblock<string> post files into
  2. transformblock<string, sensordatadto> predicate detect whether import file
  3. transformblock<string, sensordatadto> reads through csv file , creates sensordatadto structure
  4. batchblock<sensordatadto> used within transformblock delegate batch 100 requests.

    4.5. actionblock<sensordatadto> push 100 records database.

  5. actionblock mark csv imported.

i've created first few operations , they're working (bufferblock -> transformblock + predicate && process if hasn't) i'm unsure how continue flow can post 100 batchblock within transformblock , wire following actions.

does right - basic gist, , how tackle bufferblock bits in tpl data flowy way?

bufferblock.linkto(readcsvfile, shouldimportfile) bufferblock.linkto(dataflowblock.nulltarget<string>()) readcsvfile.linkto(normalisedata) normalisedata.linkto(updatecsvimport) updatecsvimport.linkto(completionblock)  batchblock.linkto(insertsensordatablock)  bufferblock.completion.continuewith(t => readcsvfile.complete()); readcsvfile.completion.continuewith(t => normalisedata.complete()); normalisedata.completion.continuewith(t => updatecsvimport.complete()); updatecsvimport.completion.continuewith(t => completionblock.complete());  batchblock.completion.continuewith(t => insertsensordatablock.complete()); 

inside normalisedata method i'm calling batchblock.post<..>(...), pattern or should structured differently? problem can mark file being imported after records have been pushed through.

task.whenall(bufferblock.completion, batchblock.completion).wait(); 

if have batch of 100, if 80 pushed in, there way drain last 80?

i wasn't sure if should link batchblock in main pipeline, wait till both finished.

first of all, don't need use completion in matter, can use propagatecompletion property during link:

// predicate bufferblock.linkto(readcsvfile, new dataflowlinkoptions { propagatecompletion = true }, shouldimportfile); // without predicate readcsvfile.linkto(normalisedata, new dataflowlinkoptions { propagatecompletion = true }); 

now, problem batches. maybe, can use joinblock<t1, t2> or batchedjoinblock<t1, t2> here, attaching them pipeline , gathering results of joins, got full picture of work being done. maybe can implement own itargetblock<tinput> can consume messages in way.

according official docs, blocks greedy, , gather data linked 1 becomes available, join blocks may stuck, if 1 target ready , other not, or batch block has 80% of batch size, need put in mind. in case of own implementation can use itargetblock<tinput>.offermessage method information sources.

batchblock<t> capable of executing in both greedy , non-greedy modes. in default greedy mode, messages offered block number of sources accepted , buffered converted batches.

in non-greedy mode, messages postponed sources until enough sources have offered messages block create batch. thus, batchblock<t> can used receive 1 element each of n sources, n elements 1 source, , myriad of options in between.


Comments

Popular posts from this blog

commonjs - How to write a typescript definition file for a node module that exports a function? -

openid - Okta: Failed to get authorization code through API call -

ios - Change Storyboard View using Seague -