Spark processing of streaming data using count-min sketches
The stream enerator continuously generates a stream S of the following format: <sid, ip>. Each entry of this stream represents a single request of a client with IP address ip, to a server with ID sid. The order of the entries in the stream corresponds to the arriving order of the requests. The total number of servers is 100 (sid is between 0 and 99, inclusive). The number of distinct IP addresses is not known, and varies throughout the stream. A possible excerpt from the stream is seen below
…
4, 198.3.5.1
6, 139.3.7.15
5, 221.6.27.251
6, 139.3.7.15
5, 198.3.5.1
…
The stream is generated by a JAR file called “stream.jar” and can be ran by using “java -jar stream.jar 1200 2000 26” from the directory in which the JAR is located. Doing so will make a stream available at hostname “localhost” and port 9000. The values 1200, 2000 and 26 are defining the time in seconds after which the stream will time-out, the rate at which the stream provides output in number of rows per second and a number influencing some characteristics of the stream.
Given τ=3000 (representing a threshold), w=60 (representing the duration in seconds of a sliding window), 𝜖=0.001 (acceptable error on the similarity function) and 𝛿=0.01 (the acceptable probability that the error exceeds 𝜖).
Let
The goal is to monitor all arrivals in
The computation of the exact value of
The Spark code should print at regular intervals (every 2 seconds):
- the number of processed updates from the start of the program
- the number of pairs contained in the estimated answer, i.e., the number of pairs with
$sim(X,Y) \geq τ$ - the time, in seconds since the start of the program
Similarity could be measured using the dot product. As the computation of the exact value of the similarity needs extensive space and computation, Count-Min Sketches could be used to summarize the vectors in sub-linear space. A CM sketch is a matrix with size
- $(sk_A * sk_B){j} = \sum{k=1}^{w} sk_{A}[k, j] * sk_{B}[k, j]$
$(sk_A * sk_B)=min_{j}(sk_A * sk_B)_j$