Streaming and Incremental Processing
- Not clearly about the could result in relation in the paper
- Not very clearly about the distributed progress tracking. How the nodes know whether there is more input.
- is it worthwhile to have the whole new system of Naiad? AKA is it inherently a poor UI/UX or just the prototype?
A streaming perspective on query execution
- Run Q(D) well defined via relational algebra/semantics
- Implemented as query plan P with access methods over D
- If execution model is push based, then access methods push records up the plan
- But we only release the result after the next clock tick (aka all records have been read and processed by all operators)
Can we think of it incrementally?
- Only allowed to read once from access method
- Push from access method is like new record from a stream
- Query result initially empty (correct if inputs are empty)
- new records update query result.
- only expose result once all inputs are read
- Can we expose intermediate results earlier?
- What if input is not absolutely ordered?
- How much state do we need to keep?
- Are we restricted to trees plans?
- What core properties do operators need to expose?
- We discussed how aggregation is a blocking operator
- can’t emit values before it sees all inputs
- what if we aggregate on time, and it has some ordering?
- then we can emit timestep by timestep?
Consider recursive queries
WITH RECURSIVE paths AS (
SELECT dst, w as totalw
WHERE src = 'A'
paths.totalw + edges.w as totalw
FROM paths, edges
WHERE paths.dst = edges.src
- run initialization query as ‘input’, call it v0
- run recursive query component using edges@v0, collect new records as v1
- repeat with @v1, @v2,… @vi until no more messages
- This is bulk synchronous processing
- the versioning is implicit in which “step” of BSP we are in
- What if we explicitly tracked the versions instead?
Consider Spark queries
- Run page rank using iterative computation
- What does its lineage graph look like after 2 iterations? 10? 100?
- grows with each iteration – need to update master lineage graph
- What if input changes (e.g., new record)?
- What about spark streaming?
- can get it to work if intermediate state is externalized as intermediate RDDs
- can implement above shortest paths
What is Naiad good for, that prior systems could not/had trouble doing?
- low latency
- “consistent intermediate outputs”
- The paper claims
- batch processing can iterate, but block
- streamng processing don’t do iteration
- triggers can iterate with no consistency guarantees (what does this mean?)
- low level infra, not for end-users
Contrast with Spark Streaming (mini-batches)
- input stream. partition by time into mini-batches
- run spark jobs over batches in parallel (can futher partition if really want)
- output stream of batches.
- compute doesn’t cross partition boundaries
- logical hierarchical timestamps
- input attaches epoch e
- step() function tells system it won’t recieve more messages with <= e epoch
- ingress: track this loop. push new counter
- egress: done with loop. pop counter
- feedback: update counter.
- t1 < t2 based on lexographic order
- for any path o1 ~> o2, we know timestamp at o2 can only be greater or equal to timestamp at o1 for same msg
- given the dataflow graph structure, can deterministically know how any path of operators will change the timestamps
- pointstamp: timestamp + operator/edge
- (t1, o1) could result in (t2, o2) iff
- exists path
o1 ~> o2 that updates t1 to be <= t2
- note this is statically determined
- pointstamp (t, o)
- occurrence count: number of outstanding events with pointstamp
- maintain on calls of send/notify
- active pointstamp if occurrence > 0
- precursor count: number of active pointstamps that could-result-in (t, o)
- when (t, o) becomes active, set it to number of preceeding active pointstamps
- when (t, o) is inactive, decrement all could-results-in pointstamps
- frontier: precursor count is 0
- can deliver notification for t
in --> A --> Ingress ---> B -----> C ---> Egress --> D --> Out
- Messages are (key, value)
- A is a distinct-by-key operator
- for inputs of (key, value) only emits the first (key, value) record with each distinct key.
- B is projection
- C is filter
- if value > 10, send to Egress, else to Feedback
- sends its output to Egress if the input value is greater than 10, and sends it to F otherwise.
- D is an aggregation operator
- sums all values irrespective of key.
What if we inject some records?
(a, 2), 
(b, 6), 
(a, 5), 
- (a, 2): 
- output of each operator
- A: (a, 2), 
- B: (a, 4), [1, (0)]
- C: (a, 4), [1, (0)]
- B: (a, 8), [1, (1)]
- C: (a, 8), [1, (1)]
- B: (a, 16), [1, (2)]
- C: (a, 16), [1, (2)]
- D: (16), 
- (b, 6), 
- goes through loop once 6 -> 12
- (a, 5) suppressed by A
When does E.onNotify(1) get called?
- on the
step() call, know epoch 1 is done
- wait until all active pointstamps <= 1 before E are done
Can we implement SQL operators?
- group-by aggregation
- symmetric hash join
- Is it different than Spark’s central