Notes on 1010 in Shakti[0] ========================== these notes form a rough -- a very rough -- guide to the Shakti 1010 model. 0. data ------- the data is assumed to have a specific structure: / .. <_> each file-i object is a dictionary of the form <`table-1>..<`table-n>!(table-1;..;table-n) tables are segmented by date. for example: tq/2018.05.01 is a dictionary of two tables t and q: !t `s`t`e`c`z`p !q `s`t`e`b`bz`a`az`m this convention is significant in selection and tabulation operations. in the example, we build four days of trade and quote data from a pair of csv files. \cd tq t:+`t`e`s`c`z`p !("scncii";",")0:"../csv/t.csv" /trades q:+`t`e`s`b`bz`a`az`m!("scniiiic";",")0:"../csv/q.csv" /quotes {($*x)1:`t`q!x,/:/:(t;q)}'+`d!2018.05.01+!4; \cd .. loading the directory of n files creates n subprocesses ("workers") each of which maps the corresponding dictionary of trades and quotes. load"tq/" .m.M (+(,`d)!,2018.05.01 2018.05.02 2018.05.03 2018.05.04)!3 4 5 6 .m.M is a map from a table of the dates in tq/ to a vector of handles, each of which connects to the corresponding worker. it is possible to load tables directly into the root process ("master"): load"weather.k" !+read `id`value !+stat `id`name`state`tz`lat`lon`elev`ant 1. ops ------ the Shakti version of the 1010 query language consists of nine "ops": load create worker maps from files in a directory base pick a base table from each map link join a new table to the base table asof asof-join a new table to the base table willbe define a new column in the base table sel select records in the base table set set arbitrary state in each worker get retrieve tables from the workers tabu map-reduce aggregations on each subtable .m.M K is a map of the "live" handles. K keeps track of which processes contain tables falling within the range of selections made in the course of query execution. each of the functions listed above has the structure: {d X} where X is a parse-tree to be executed in each worker .m.M K. here are brief explanations of each of the functions. 1a. load -------- load:{.m.M:[];."\\l ",x} the load function creates .m.M, a map from the partition column metadata to worker handles. i provide load as a function in order to define .m.M in the case where we \l a k script. 1b. base -------- base:{K::!.m.M;d(b;T::x);$[K;(. .m.M){Z::+`d!y}':*.!K;::];} `d is the partition column and K is an index into .m.M, the live handles. d is the distribution function, and is explained in detail below. b initializes the necessary globals in each worker: T, G, W, L, I, and C. b:{G::W::L::[];I::!#!*C::(!+!x)!T::x} in each worker, Z is a table of the single partition column value for that worker. e.g. +`d!20180501. in each worker, I is a vector of indices into the base table whose name (a symbol) is T. note that the columns of T are a subset of what we refer to as "the base table". the base table consists of T together with columns defined as "willbes" -- virtual columns -- and columns from foreign tables which are linked into the base table by means of the link and asof functions. C is the column dictionary. for each column x in the base table, C x is a package of information whose interpretation by the function c retrieves the current state of the value of x. the c function is described in the section on columns below. W, G, and L are caches for the values of willbe columns, g_ function columns, and link index vectors respectively. the l, g, and w functions memoize these values and store them in L, G, and W. 1c. link, asof -------------- link:{d(s(?;z);x;y^`d);} asof:{d(s(bin;z);x;y^`d);} link and asof are lazy: given the arguments necessary to compute the link index vector, they call the k function which creates entries for each column in the foreign table. each column entry is paired with the arguments to link/asof, which are distinguished in C by the use of ? and bin (binary search). 1d. willbe ---------- willbe:{d({C[x]:$["g"=*$*y;(y;I);y]};x;y);} like link and asof, willbe is lazy. we don't actually compute the value of the willbe column at the point of definition, but rather store its definition in C. where x is a willbe with definition y (a parse tree) C x is simply y. 1e. sel ------- sel:{$[`d~*x;K@:&&/'K x;d({I@:&!p x;W::L::[]};x)];} sel is "semi-lazy". we don't alter the structure of the base table (which wouldn't make sense in any case, since not all columns of the base table are actually present). rather, we calculate the boolean-valued expression x and use it to reset I. thus, if I is 2 3 7 9 11 and the result of &!p x is 1 2 then we reset I to 2 3 7 9[1 2]. since all cached values in W and L are now invalid, we must zero them out. note that sel is where we may reset K, the index into live handles. for example, if the effect of a selection is to restrict subsequent ops to d = 2018.05.02 2018.05.03, then K will be reset to: +(,`d)!,2018.05.02 2018.05.03 since those workers are correlated exclusively with those dates. in other words, subsequent operations will be distributed to just those workers. 1f. get ------- get:{$[K;,/;::]d({+u!c'u:$[x~`;!C;x]};x)} unlike the previous five ops, get does not modify state. rather, it returns a table containing zero or more columns in the base table. 1g. set ------- set:{d({!(;x;y)};x;y);} since a query in 1010 is a sequence of ops with side-effects (some lazy), set can be used to initialize or modify *state* in each worker. 1h. tabu -------- tabu:{n$[K;r;a][x;y]z]} like get, tabu returns a result and sets state in the master for subsequent queries. tabu is the most complex of the ops we implement in this model. in d r (reduce) is applied to the result of m (map). buried in the map function (m) is the worker-call structure. map-reduce is described in a separate section. 2. column mechanics ------------------- expressions are passed to op functions as parse-trees (sel, willbe, link, asof) or dictionaries of parse-trees (tabu). parse-trees can be derived from k expressions with !: !"+/a-b" ((/;+);(-;`a;`b)) parse-trees are pre-processed by the p function: p:{$[`n=@x;(c;,x);1=#x;x;~#x;();@[x;1_!#x;p]]} which descends through the parse-tree structure seeking column references (e.g. `f). when a column reference is found, p substitutes a call to the c function (p;c): !"b=1" (=;`b;1) p@!"b=1" (=;({$[`n=@c:C x;(!c)[x]I;`n=@*c;(!*c)[x]l . c;2=#c;(g[x]. c)I;w[x]c]};,`b);1) ------------------------------------------------------------------------ the c function can be thought of as a "column dispatcher": c:{$[`n=@c:C x;(!c)[x]I;`n=@*c;(!*c)[x]l . c;2=#c;(g[x]. c)I;w[x]c]} in order to understand the logic of c, we first need to understand how column data is represented in C. we let c = C x. if x is a physical column in the initial table t, then the result is `t. if x is a linked-in column from a foreign table u, then the result is (u;(c;d);(f;w)), where c and d are lists of column expressions which link u to the base-table, f is either ? or bin, depending on the link-type (link or asof), and w is a parse-tree selection expression to be applied to the foreign table. if x is a sticky willbe then we call g on x and c and index the result by I. is x is a willbe, then c is a parse-tree which defines x. c dispatches x (c=C x) accordingly: physical column :(!c)[x]I linked-in column :(!*c)[x]l . c sticky willbe :(g[x]. c)I willbe :w[x]c the function l either returns the link index vector cached in L, or computes that vector, creates a memoized entry for it in L, and returns it. l:{u:{+(`$$!#x)!x};$[x in!L;L x;L[x]:z[0][(u@{$[y;x@&x y;x]}[!x;z 1]'*y);u@(!p@)'y 1]]} note that it is not necessary to flip the arguments and then use ?/: to compute the link index. the function w either returns the value of the willbe expression cached in W, or computes that value, creates a memoized entry for it in W, and returns it. w:{$[x in!W;W x;W[x]:!p y]} 3. map/reduce ------------- the version of map-reduce used in 1010.k can also be found in m.k, where it is called directly with approprate arguments. i will use that version. our example uses a directory t containing two files, 2018.01.01 and 2018.01.02. each file contains a dictionary of tables; in this case, the dictionary is a singleton with domain t and range = a table of three columns f, g, and h. we kick things off with: \l tq/ which creates two workers, each of which maps all the tables in the corresponding dictionary -- in this case t -- and a table in the master .m.M. .m.M is a table with a single column d: .m.M (+(,`d)!,2018.01.01 2018.01.02)!9 10 suppose our table a on worker 1 is f g h - -- - 1 10 0 1 11 1 2 12 2 and on worker 2 is f g h - -- - 1 20 0 2 21 1 2 22 2 3 23 3 and our query is tabu[();`f]`a`b`c!((sum;`g);(sum;`h);(`avg;`h)) then our workers will return f | 0 1 2 - + --- -- - 1 | 21f 1f 2 2 | 12f 2f 1 and f | 0 1 2 - + --- -- - 1 | 20f 0f 1 2 | 43f 3f 2 3 | 23f 3f 1 where 0 is sum g, 1 is sum h, and 2 is count h. note that we compute only the elementary functions used across aggregations. we obtain our final result by sum- and average-reduction: f a b c - --- -- --------- 1 41f 1f 0.3333333 2 55f 5f 1.666667 3 23f 3f 3f two further facts about map-reduce to bear in mind in the course of the analysis given below: - there are simple aggregations such as sum, and there are complex aggregations which decompose into simple ones, such as avg, which divides sum by count. thus, we do not reduce averages by averaging, but rather by adding their component sums and dividing by the sum of their component counts. - if we decompose complex aggregations into elementary ones, and then map these on each worker, we will avoid computing each elementary aggregation more than once. for example, if our tabulation is avg x and sum x, then we will compute sum x just once for both aggregations. the suite of functions comprising map-reduce consists of m (map) and r (reduce) and a set of reductions represented by U and V: / map-reduce m:{[t;c;b;a],/$[B;D,/:';{x}]0 key'.m.M[D:#[!.m.M;C#c;()]]2:({#[!x]. y};t;((C:`d in*c)_c;(B:`d~*b)_b;a))} r:{[t;c;b;a]#[#[m[t;c;b]n!k;();$[`a=@b;!b;b,:()];n!V[*:'k],'n];();f[k!n:`$$!#k:?,/v'a]'a:u'a]} / reductions U:(avg;var;dev)!({(%;(sum;x);(count;x))};{({(x%z)-y*y%:z};(sum;(*;x;x));(sum;x);(count;x))};{(%:;(var;x))}) u:{$[@x;x;f:U@*x;u'f . 1_x;u'x]} V!:V:(last;sum;min;max);V,:(#:;?:)!(+/;?,/) v:{$[@x;();V@*x;,x;,/v'x]} f:{$[@y;y;n:x y;n;f[x]'y]} in order to follow the execution of our example, i've rewritten m and r to use tiny steps in the computation, and which also trace the results of each step in the console. mm:{[t;c;b;a] trace["mm:"] {[t;c;b;a]} trace["t:"] t trace["c:"] c trace["b:"] b trace["a:"] a trace["d:(B:`d~*b)_b"] d:(B:`d~*b)_b trace["e:(C:`d in*c)_c"] e:(C:`d in*c)_c trace["f:{#[!x]. y}"] f:{#[!x]. y} trace["g:(f;t;(e;d;a))"] g:(f;t;(e;d;a)) trace["D:#[!.m.M;C#c;()]"] D:#[!.m.M;C#c;()] trace["q:.m.M[D]2:g"] q:.m.M[D]2:g trace["r:0 key'q"] r:0 key'q trace["h:$[B;D,/:';{x}]"] h:$[B;D,/:';{x}] trace["s:,/h r"] s:,/h r s} rr:{[t;c;b;a] trace[""] "press enter to advance through the calculation" trace["rr:"] {[t;c;b;a]} trace["t:"] t trace["c:"] c trace["b:"] b trace["a:"] a trace["a:u'a"] a:u'a trace["k:?,/v'a"] k:?,/v'a trace["n:`$$!#k"] n:`$$!#k trace["h:mm[t;c;b]n!k"] h:mm[t;c;b]n!k trace["d:f[k!n]'a"] d:f[k!n]'a trace["e:n!V[*:'k],'n"] e:n!V[*:'k],'n trace["g:$[`a=@b;!b;b,:()]"] g:$[`a=@b;!b;b,:()] trace["r:#[h;();g;e]"] r:#[h;();g;e] trace["s:#[r;();d]"] s:#[r;();d] s} / example map-reduce rr[`t;();`f]`a`b!((sum;`g);(avg;`h)) rr and mm are both called with table t, where-clause w, by-clause b, and a dictionary of aggregation parse-trees a: rr:{[t;c;b;a]} t:`t c:() b:`f a: | -- + --- -- `a | sum `g `b | sum `h `c | avg `h reduce each aggregation to an expression in elementary operations: a:u'a [a:(sum;`g);b:(sum;`h);c:(%;(sum;`h);(#:;`h))] find the unique elementary operations: k:?,/v'a ((sum;`g);(sum;`h);(#:;`h)) n is a vector synthetic domain-names: n:`$$!#k `0`1`2 call mm with t=t, c=c, b=b, and a=n!k: mm:{[t;c;b;a]} t:`t c:() b:`f a: | -- + --- -- `0 | sum `g `1 | sum `h `2 | #: `h d is the segmentation column - it is not a column in t, so it may not appear in the executable by-clause or where-clause: d:(B:`d~*b)_b ,`f e is the where-clause constraint column. again, we delete constraints referring to d: e:(C:`d in*c)_c () the selection function to be executed in the server: f:{#[!x]. y} {#[!x]. y} g = (selection function;table;where-clause;by-clause;aggregations): g:(f;t;(e;d;a)) ({#[!x]. y};`t;(();,`f;[0:(sum;`g);1:(sum;`h);2:(#:;`h)])) D = dates selected in the where-clause c: D:#[!.m.M;C#c;()] d ---------- 2018.01.01 2018.01.02 q = selection from each server: q:.m.M[D]2:g f | 0 1 2 - + --- -- - 1 | 21f 1f 2 2 | 12f 2f 1 f | 0 1 2 - + --- -- - 1 | 20f 0f 1 2 | 43f 3f 2 3 | 23f 3f 1 dekey each result: r:0 key'q f 0 1 2 - --- -- - 1 21f 1f 2 2 12f 2f 1 f 0 1 2 - --- -- - 1 20f 0f 1 2 43f 3f 2 3 23f 3f 1 glue D to each record if d is in the original by-clause: h:$[B;D,/:';{x}] {x} raze the dekeyed results - *** return to rr: s:,/h r f 0 1 2 - --- -- - 1 21f 1f 2 2 12f 2f 1 1 20f 0f 1 2 43f 3f 2 3 23f 3f 1 *** back in rr - h is the result s of mm -- note that columns containing the unique elementary calculations required for the reduction phase (and that the value-column names are synthetic). we will do the initial aggregation on h: h:mm[t;c;b]n!k f 0 1 2 - --- -- - 1 21f 1f 2 2 12f 2f 1 1 20f 0f 1 2 43f 3f 2 3 23f 3f 1 f (the global function - see above) will map actual column-names in the aggregation-dictionary to synthetic domain-names: d:f[k!n]'a [a:`0;b:`1;c:(%;`1;`2)] e says how to aggregate the elementary operations. in this case, we sum both sums and counts: e:n!V[*:'k],'n | -- + --- -- `0 | sum `0 `1 | sum `1 `2 | sum `2 normalize the by-clause: g:$[`a=@b;!b;b,:()] ,`f aggregate the resuls of the elementary operations: r:#[h;();g;e] f | 0 1 2 - + --- -- -- 1 | 41f 1f 3f 2 | 55f 5f 3f 3 | 23f 3f 1f and finally calculate the aggregations themselves (sum, average, &c.): s:#[r;();d] f | a b c - + --- -- --------- 1 | 41f 1f 0.3333333 2 | 55f 5f 1.666667 3 | 23f 3f 3f Footnotes --------- [0] this shakti implementation continues the sequence of k3 implementations described here: http://www.nsl.com/k/k3_1010/1010.txt the unoptimized single table version for k3 is: http://www.nsl.com/k/k3_1010/1010_0.k and the optimizing version is: http://www.nsl.com/k/k3_1010/1010_1.k [1] in q (k4) we have q)a:([]d:10 20 30) q)b:0 1 2 q)a!b d | --| - 10| 0 20| 1 30| 2 also note that in shakti we recommend a verbal distinction between dictionaries, which are maps from a domain of names, and the more general form whose domain is a list of objects. [2] in k4: select a by b from t where c [3] in k4: select a from t where c.