Process Graphs


Node systems

A node system consists of a master node (Alpha) and a system of worker nodes. There is only one output node, called "Omega". No node depends on Omega. A node without dependencies is called an input node. There is always at least one input node.

A node processor is defined by a set of disk objects in a specified "metadata" directory m/:

	m/			metadata directory
	  dag.k			node dependency dag
	  host.k		optional host assignment
	  alpha.k		optional extension to Alpha API
	  node/
	    :
	    [node].k		optional script for [node] in dag
	    :
	    omega.k		optional script for Omega
	    default.k		optional default node script

dag.k is a k script which defines the dag of the system. For example, the dag

	omega
	  a
	    c
	      g
	      h
 	    d
	      g
	      i
	  b
	    e
	      h
	      i
	      j
	    f
	      i
	      k

is defined by the script:

	omega:`a`b	/ keyword:  omega must be present
	a:`c`d`e
	b:`e`f
	c:`g`h
	d:`g`i
	e:`h`i`j
	f:`i`k

Omega depends on a and b. a depends on c, d, e. &c. g, h, i, j, and k are input nodes. Loops are not allowed.

m/alpha.k is an optional script which extends the API of the Alpha processor.

m/host.k is an optional script which assigns to selected nodes a forwarding address on a remote host. For example,

	a:b:(`ABC;1234)
	d:(`DEF;4567)

says that nodes a and b are started by the host server on ABC with port 1234, and that node d is started by the host server on DEF with port 4567.

m/node is a subdirectory of scripts, one per node. For example:

	m/nodes
	  a.k
	  b.k
	  e.k
	  default.k

Each node tries to load its own script. If it fails, it loads the default script.

Node processing

A process is started by invoking the shell script with command line arguments. The first argument in the command line is the script to load. Subsequent arguments are stored in .i, where they are available within the script:

	k n/shell [script] ...

(See below for the rationale behind this mechanism of argument passing.)

A node system is started by starting an Alpha process:

	k n/shell n/alpha [port] [dir] [max] ...

The shell script stores the command line arguments in .i and loads the system script n/alpha.k.

The Alpha scripts reads the dag file in [dir] and constructs a diagram of the node dependencies.

Alpha opens all input nodes.

A node is opened by starting a process, either locally:

	k n/shell n/node [host] [port] [node] ...

or remotely, through the host server defined for that node (see below, section 10).

The shell script stores the command line arguments in .i and loads the system script n/node.k.

The node script connects to Alpha and sets up various functions and variables in .n. The final step in opening a node is to load the script for that node from dir/node.

Messages

Alpha recognizes four types of asynchronous sender:

	- a connected node (_w _in Nodes)
	- a connected Mu (_w _in Mu)
	- itself (_w = Alpha)
	- an unknown sender

A node can send information to Alpha in two ways:

	.n.alpha(function_symbol;argument_list)
	.n.node(function_symbol;argument_list)

The alpha method is used to apply Alpha functions to node arguments. Messages transmitted by this method are not replicated to downstream nodes.

The node method is used to relay messages by way of Alpha to downstream nodes. When Alpha receives a node message, it logs it and sends it on to dependent nodes for evaluation. Alpha is a router for node messages.

A message from Alpha to itself is executed immediately (untrapped) and logged.

A message from a Mu or unknown sender is executed immediately (untrapped) but not logged.

Alpha recognizes three types of synchronous message:

	- a connecting node (message _in Nodes)
	- a connecting Mu (message ~ _n)
	- an unknown sender (other)

A connecting node n receives (Dir;ante n). The event is logged.

A connecting Mu receives (Dir;Dag;Log). The event is not logged.

Messages from unknown senders are executed (untrapped) but not logged.

Open

Initially, Alpha opens all input nodes.

Eventually, Alpha needs to send a message to a node which has not been opened. It attempts to send this message and fails. It then appends the failed message to the queue and tries to open the node (see below, section 8, for a discussion of queued open). Later, the node connects. Alpha checks to see whether there are messages pending on the queue. If so, it sends itself a message to process those messages. When that message arrives, pending messages are removed from the queue and sent to the node.

A node is opened locally by executing the command:

	start "[dir]/[node]" /min cmd.exe /c k n/shell n/node [host] [port] [node] ...

and remotely by sending to a host server the list of strings:

	$(_h;_p;[node];...)

If the attempt to open fails, an error record is appended to Log:

	(_t;`error;;`open;node)

and a second attempt is made to open the node.

Close

There are two ways for a node to close itself:

	\\
	.n.alpha(`close;.n.Node)

If a node executes \\, Alpha detects the close and reopens the node. Alpha regards \\ as an abnormal end.

A node can ask for an orderly shutdown by sending the `close message. Alpha then *synchronously* executes .m.c on the node. Default .m.c is "\\\\".

A node can execute \\ to restart itself with pristine memory.

If Alpha closes, all nodes close.

Mu

Dag is a list of two rows: if node a depends on node b, then there is a column i of Dag such that Dag[0;i]=`a and Dag[1;i]=`b.

Messages sent to Alpha are either `alpha or `node operations. Messages sent from Alpha are `out operations. There are three `event operations: `open, `connect, and `close, and one `error message.

Operations are appended to Log, a table with five columns: t (time), o (operation), n (node), f (function), and a (argument).

Possible operation rows:

	(_t;`node;from_node;function_symbol;argument_list)
	(_t;`alpha;from_node;function_symbol;argument_list)
	(_t;`out;to_node;function_symbol;argument_list)
	(_t;`event;;`open;opened_node)
	(_t;`event;;`connect;connected_node)
	(_t;`event;;`close;closed_node)
	(_t;`error;;`open;opened_node)

Log keeps a maximum of 1000 messages.

A special process, called Mu, can connect to Alpha and receive the current Dir, Dag, and Log variables:

	k n/shell n/mu [host] [port]

Connection establishes an interest in subsequent operations, which Alpha appends to .n.mu.Log in every Mu process.

The connection can be broken by shutting down (\\) or closing the connection:

	3:.n.mu.Alpha

Messages sent asynchronously by Mu to Alpha are executed immediately and not logged:

	.n.mu.alpha(function_symbol;argument_list)

Monitor and control applications can be built around Mu; for example, stopping and starting nodes, progress reports in lengthy computations, &c.

A node can be a Mu for another node system. For example, suppose that a should execute some procedure when b in a's system is done and when c and d in some other system are done. Then a can be a Mu for that other system, and execute the procedure when it has received a done message from b and when Alpha for that other system has received done messages from c and d.

Nesting

A node can be an Alpha with its own sub-system by loading n/alpha with .i suitably prepped by the node script. For example, suppose node c depends on nodes a and b. We want c to function as an Alpha for a further set of nodes d, e, and f.

Define the c script as:

	.i:(port;dir;max)	/ e.g. ("1001";"def"), max defaulted

Then

	\l n/alpha

c (the node) completes when Omega in its subsystem is reached. At that point, c should send (according to whatever convention governs nodes in the outer system) the completion message.

A process is a node if it is connected to an Alpha. A process is an Alpha if it is connected to nodes. A process can be both a node and an Alpha.

Serialization

If Alpha is started with infinite max:

	k n/shell n/alpha 1000 m 0I

then nodes are opened on demand. For example, if there are 17 input nodes, then Alpha opens all 17. As messages arrive for new nodes, those nodes are opened immediately.

If Alpha is started with max = n:

	k n/shell n/alpha 1000 m 4

then at most four nodes are allowed to run simultaneously. For example, if there are 17 input nodes, then Alpha opens the first four. When the first of these nodes closes, the fifth input node is opened. Eventually, all input nodes have run. Meanwhile, as messages arrive for new nodes, these nodes are queued up, with priority determined by queue-time.

An interesting case is max = 1:

	k n/shell n/alpha 1000 n 1

In this case, the nodes are fully serialized.

Serialization can be used to prevent contention for disk or processor resources.

Scheduling

Nodes can use timers to wake up at an absolute or relative time (.t..A) or use a repeating timer to check state (.t..R).

A global schedule can be imposed by means of a Mu: check the Log to see whether nodes have completed according to schedule.

Hosts

By default, nodes run on the host of the Alpha process. If there are nodes to run on a remote host, these can be specified in m/host.k. For example, suppose there are nodes a, b, c, and d, Alpha is running on host ABC with port 1000, and b and d are to run on host XYZ.

Define m/host.k:

	b:(`XYZ;1234)
	d:(`XYZ;1234)

A host server with port 1234 must be running on host XYZ:

	k -i 1234

When Alpha needs to start b, it will send (`XYZ;1234) a message which causes the host server to start b with return address (`ABC;1000).

If the message fails, Alpha will retry.

Close events triggered by host servers are ignored by Alpha.

A possible remote-host architecture:

Alpha is running on host X. A Mu on host Y starts host servers Alpha expects on host Y. Mu detects host server failures and restarts them. Eventually, messages from Alpha will reach restarted host servers.

Example

The default node script:

	Done[.n.Nodes]:0
	done:{Done[x]:1;if[&/Done[];.t..A[_t+2]".k.end[]"]}
	end:{.n.node`done,.n.Node;.n.alpha`close,.n.Node}
	if[0=#.n.Nodes;.t..A[_t+2]".k.end[]"]

The omega script:

	Done[.n.Nodes]:0
	done:{Done[x]:1;if[&/Done[];.n.alpha(`start;)]}

The default script constructs a dictionary Done containing a boolean atom for each input node.

Alpha opens the inputs. The last line of the default node script tests True (as an input node, .n.Nodes is empty) and sets a timer: 2 seconds from now, run the end function.

The end function tells Alpha to relay the `done message to those nodes which have .n.Node as an input, and then sends the `close message.

Alpha receives a `done message from node n, and relays the message (`done;n) to all nodes which have n as input.

A non-input process receives a `done message: the done function is applied to n, causing Done n to be set to 1. If all variables in Done are 1, a timer is set: 2 seconds from now, run the end function.

Eventually, Omega is opened. The omega script has a slightly different version of the done function: if all incoming nodes are done, start the system.

API

  Alpha:	3:(`close;node)		execute .m.c of node
		3:(`stop;)		execute \\ in Alpha
		3:(`start;)		close all nodes and restart

		(Dir;Dag;Nodes)4:_n
		(Dir;ante node)4:node
		r:4:x

		+ functions in m/alpha.k

  node:		.n.Node			self
		.n.Nodes		antecedent nodes
		.n.Dir			directory

		.n.node(`f;a)		f . a in dependent nodes
		.n.alpha(`f;a)		f . a in Alpha

		.m.c:"\\\\"

		+ functions in m/node.k

  Mu:		.n.mu.Alpha		Alpha handle
		.n.mu.Dir		Alpha's directory
		.n.mu.Dag		Alpha's dag
		.n.mu.Log		Alpha's log table
		.n.mu.Nodes		nodes in system

		.n.mu.post x		nodes dependent on x
		.n.mu.ante x		nodes on which x depends
		.n.mu.leaf[]		leaves system

		.n.mu.alpha(`f;a)	f . a in Alpha

		.m.c:"\\\\"

Scripts

        n/			Node system
	  shell.k		system script
	  alpha.k		Alpha server
	  node.k		node client
	  mu.k			Mu client
	  timers.k		timers
	  dag.k			dag functions

State

        Alpha		_w of Alpha
	Max		maximum number of concurrent nodes
	Dir		node directory
	Nodes		a vector of nodes in the system
	Dag		a list of (post_nodes;ante_nodes)
	Who		Who[node] = _w:  0 (closed), 0N (opened), >0 (connected)
	Log		table of log messages
	Mu		vector of Mu handles
	Host		Host[node]: _n (local), (`host;port) (host server)
	Open		vector of nodes to be opened
	Out		Out[node] = queued messages to node