============================================================================== | A P P E N D I X 1 | ============================================================================== //{{{ stamp Date: Mon, 13 May 96 00:40:59 BST From: phw To: occam-com@ukc.ac.uk Subject: Java threads and occam processes //}}} //{{{ SUMMARY I finally read Sun's implementation of its Piped I/O Stream classes properly and think I now understand it. There is a simple explanation of why Oyvind's ComsTime program cycles every 1.5 to 2.5 seconds and a simple way to reduce that to around 30 milliseconds (for a UNIX Java Platform). The Piped I/O Streams implement blocking FIFO communication (rather clumsily) and not an occam channel. Enclosed are *better* classes both for a proper occam channel and for a blocking FIFO. Both of these reduce the ComsTime cycle time to around 5.5 milliseconds. That's roughly a 1000-fold speedup on using Piped I/O Streams ... it's only now 1000-times slower than (KRoC) occam running on the same UNIX workstation ... //}}} //{{{ the trouble with Piped I/O Stream Well, it's too complicated and inefficient: //{{{ PipedOutputStream is only a front for PipedInputStream The out method in PipedOutputStream is a front for the receive method in PipedInputStream. PipedOutputStream can therefore be discarded in favour of giving public access to that method. Then, we need only the concept of `PipedStream' and life will be simpler. The only reason I can think of for distinguishing PipedInputStream from PipedOutputStream is so that we can give the `PipedStream' to a writer so that it can't see the read method and vice-versa. That has some merit. But a simpler way to do that would be to derive a `PipedOutputStream' class from `PipedStream' that simply hides its read method (and vice-versa). I need to check whether Java lets us do that kind of thing ... ??? //}}} //{{{ the code for the circular buffer is weird PipedInputStream contains a circular buffer. It maintains pointers to the next free slot (`in') and the oldest slot (`out') and tries to use these to determine whether the buffer is full, empty or neither. This results in some pretty obscure code, since `(in == out)' could mean either empty or full. The code could be made crystal clear by also maintaining a size count. //}}} //{{{ the Thread variables don't seem to be properly maintained PipedInputStream declares two Thread variables (`readSide' and `writeSide'), but don't panic ... no new threads are set up with them ... they are only used to get handles on the (latest) threads that are calling the read/write methods. With these handles, a blocked reader can ask if the last writer is still alive etc ... but these checks look a bit hit-or-miss to me ... so I wouldn't put too much faith in them ... e.g the last writer may have died, but there could still be other writers who are alive? //}}} //{{{ blocking is implemented by `busy' waiting loops This is the killer so far as overheads and latency are concerned! When the buffer is empty, a reader can't read and must be blocked. When the buffer is full, a writer can't write and must be blocked. This blocking is implemented by a `busy' waiting loop on the emptiness (respectively fullness) of the buffer. The body of this loop contains a "wait (1000)" call that puts it to sleep for (at least) 1 second ... the parameter to "wait" representing milliseconds. The waiting loop, therefore, checks its exit condition at most once per second! I wonder what Sun had in mind when they wrote this? //{{{ effect on the ComsTime benchmark The ComsTime program has four processes trying to communicate through these blocking buffers. The buffers have 1024 elements and never get full ... there is only one packet flying around the feedback loop in the benchmark, so the buffers never hold more than one element. BUT, they do get empty. Almost all reads will find an empty buffer and, so, drop into the once-per-second-waiting-loop. Fortunately, most of these waits are in parallel with each other. To complete a cycle, a packet has to get through four buffers. This completely explains the cycle times for ComsTimes, which range between 1.2 and 2.5 seconds. //}}} //{{{ reducing the wait to the minimum The minimun wait settable is "wait (0, 1)", which represents one nanosecond. However, Java on Unix platforms (like KRoC/SPoC on UNIX platforms) will have to use timers provided by UNIX. Unless they know something we don't (and they may), the best UNIX provides are time values that may be expressed in nanoseconds, but which are actually incremented only every 10 milliseconds. Therefore, a "wait(0, 1)" will be effectively the same as a wait pretty close to "wait(10)". Timings on other platforms (e.g. Windows or the real Java processor from Sun) will hopefully increment time more frequently ... On our UNIX platform (a Sun SPARC ???), all times between "wait(0, 1)" and "wait(9, 99999)" give the same results for the ComsTime cycle ... which is between 30.3 and 30.8 milliseconds. There is reason to believe (see below) that Java can do this in around 5.5 milliseconds, so the above results imply that the four parallel waits only partially overlap giving roughly 2.5 sequential 10 millisecond waits. This seems very plausible. //}}} //{{{ deleting the wait won't work Reducing the wait to zero or simply deleting it doesn't work. Without the wait, the reader reamains in the busy-waiting loop continuously. During this wait, it has control of the object monitor ... the wait is (and has to be) inside a `synchronized' method. Since it never releases this monitor, the writer can't get in to change the empty-condition upon which the reader is waiting ... and we have livelock! //}}} Any occamist could program a block properly (i.e. without `busy' waiting). See below! //}}} //}}} //{{{ quick summary of Java threads //{{{ disclaimer This is my understanding of the thread methods used below. This may be completely wrong ... if anyone knows, please tell me! It's based on the tiny amount of information gleaned from 5 text books and the Java `white pages' report, an intuitive guess as to what the method names might really be doing and the fact that it seems to work. //}}} //{{{ Java threads/objects versus occam processes Java threads and objects are distinct things. An object has data and methods. A thread can snake around anywhere, calling a method of one object some time and a method of a different object another time. Object methods methods are not normally under the control of the object, but are potential fragments of any threads that care to call them. This seems to be pretty much standard across all object-oriented languages. My problem arising from this is that object methods just don't seem `object-oriented' to me ... What we are trying to do is approach the occam model, where an object has its own data and its own thread (or threads) that live entirley inside the object and give it its life. This does seem a much more `object-oriented' scheme of things! Reconciling these philosophies is tricky. What would be good is to find some mechanisms for working with Java that approach the occam model and, hence, give us semantic clarity. That would be a most valuable contribution from the occam community. //}}} //{{{ yield, suspend and resume Executing `yield ()' is like executing `RESCEHDULE()', or `PAR (SKIP, SKIP)' in occam. It just reschedules the current thread to the back of the run-queue. Executing `suspend ()' deschedules the current thread. To reschedule it, another thread has to execute a `name.resume ()', where `name' is a thread varaible holding the suspended thread (a bit like a transputer process descriptor). This other thread has to have access to that `name' variable. Because Java threads can be descheduled any time the underlying threads kernel sees fit, data accessible by two or more threads cannot be managed by those threads carefully using just these methods ... (?) I've not used them at all in the implementation of the occam channel. //}}} //{{{ synchronised methods (and synchronised blocks of code) Object data being accessed by multiple threads needs protection. Methods that may be called by multiple threads to read/update object data should be defined as `synchronized'. Synchronized methods for any particular object are controlled though a monitor -- i.e. only one such synchronized method (per object) can be executing at any time. A thread that calls a synchronized method of some object is put on the SYNC-QUEUE for that object. So each object maintains a SYNC-QUEUE through which it can control use of its synchronized methods. Once a thread gets to the front of an object's SYNC-QUEUE and starts executing the synchronized method, it normally completes the method and goes on to do something else. As it completes the method, the next thread on the SYNC-QUEUE will be scheduled to execute another of that object's synchronized methods. //}}} //{{{ wait/notify However, during execution of a synchronized method, a thread may execute a `wait ()'. This -- and here my understanding is guesswork -- causes that thread to block and to be put on a WAIT-QUEUE (also belonging to the object whose synchronized method was being executed). At which point, the monitor is released and another thread on the SYNC-QUEUE for the object is scheduled. To unblock a thread on the WAIT-QUEUE of an object, another thread has to execute a `notify ()' method for that object. This puts the first thread on the WAIT-QUEUE of that object back on to the SYNC-QUEUE for that object. When its turn comes, it regains the monitor and resumes execution of the synchronised method at the point just after it had executed the `wait ()'. [Can these semantics really be right ... they seem to work though ???] The thread executing the `notify ()' usually does it during a synchronized method -- because it only then has the information necessary to decide on such a release. The thread doesn't know which thread is released and had better not care. I imagine the thread executing the `notify ()' continues running -- i.e. retains control of the monitor -- but I'm not sure. It would simplify coding if that were definitely true ... (Help please!). //}}} //{{{ more on wait/notify A thread may also execute a `wait (millisecs)' or `wait (millisecs, nanosecs)'. This seems to put the thread on two queues: a TIMER-QUEUE (global to all threads) and the WAIT-QUEUE (for the object whose synchronized method was being executed). Such a thread gets back on to the SYNC-QUEUE for the object either by the time-out occuring or by being at the front of the WAIT-QUEUE when someone else executed the `notify ()' method for that object. There is also a `notifyAll' method which releases all threads on the WAIT-QUEUE back on to the SYNC-QUEUE for the object. Executing a `notify ()' or `notifyAll' when the WAIT-QUEUE is empty is OK, but is probably only done by the desperate (?). Executing a `wait ()' or `notify ()' outside a synchronized method does ... ? //}}} //{{{ putting it together A thread may be: running : either actually running or on the run-queue (which, from the point of view of designing code, is the same thing); blocked : on the SYNC-QUEUE for some object (either through trying to execute one of its synchronized methods and some other thread got their first ... or through having been through a wait/notify cycle); blocked : on the WAIT-QUEUE for some object (and, possibly, on a global TIMER-QUEUE as well). This happens by calling a wait method whilst inside a synchronized method. Humm ... :-) ... ?!! //}}} //}}} //{{{ a zero-buffered synchronised (i.e. occam) channel class for Java It *is* possible to implement a fully synchronised zero-buffered occam channel in Java without busy-waiting. Consider: //{{{ CHAN_OF_INT.java import java.lang.InterruptedException; // needed by the standard wait method public class CHAN_OF_INT { //{{{ COMMENT documentation // //CHAN_OF_INT implements an occam CHAN OF INT. There is full synchronisation //between the outputting and inputting thread. It assumes that the connection //is point-to-point ... no checks are made on this, but there could be! // //There is no logical buffering of data in the channel. However, each int //is actually copied three or four times with this implementation! I can't //see how to do this with less copying. Ideally, we ought to copy the message //directly from the outputting thread to the inputting thread ... just like //in occam ... // //}}} //{{{ local state int channel_hold; boolean channel_empty = true; //}}} //{{{ public synchronized void out (int n) throws InterruptedException { public synchronized void out (int n) throws InterruptedException { if (channel_empty) { //{{{ first to the rendezvous channel_empty = false; channel_hold = n; // second copy of the message wait (); // wait for the input process channel_empty = true; //}}} } else { //{{{ second to the rendezvous channel_hold = n; // can we copy this directly to the receiver? notify (); // schedule the waiting input process //}}} } } //}}} //{{{ public synchronized int in () throws InterruptedException { public synchronized int in () throws InterruptedException { if (channel_empty) { //{{{ first to the rendezvous channel_empty = false; wait (); // wait for the output process channel_empty = true; return channel_hold; // third copy of the message //}}} } else { //{{{ second to the rendezvous int temporary = channel_hold; // can this be avoided? notify (); // schedule the waiting output process return temporary; //}}} } } //}}} } //}}} It assumes it is used for point-to-point connection only -- i.e. no multiple readers or multiple writers. It could be extended to allow that though. But as it stands, it is very simple. It just follows, as best it can, the way channels are implemented on the transputer. Oyvind's version of the context-switch benchmark (I've just changed some names and made the consumer process an object/thread on an equal footing to the others) becomes: //{{{ ComsTime.java import java.util.*; //{{{ PROC Prefix (VAL INT n, CHAN OF INT in, out) // yen.Teig 20.3.96 // modified Welch 8.5.96 class Prefix extends Thread { private CHAN_OF_INT in; private CHAN_OF_INT out; private int n; //{{{ constructor Prefix (int n, CHAN_OF_INT in, CHAN_OF_INT out) { // setName ("Prefix"); this.n = n; this.in = in; this.out = out; start (); } //}}} //{{{ run public void run () { // System.out.println (" " + getName() + " " + toString()); try { //{{{ main code in here out.out (n); // SEQ while (true) // out ! n { // WHILE TRUE int value; // INT value: value = in.in (); // SEQ out.out (value); // in ? value } // out ! value //}}} } catch (InterruptedException caught) { System.out.println (" " + getName() + " method run exception"); } } //}}} } //}}} //{{{ PROC Delta (CHAN OF INT in, CHAN OF INT out.0, out.1) // yen.Teig 20.3.96 // modified Welch 8.5.96 class Delta extends Thread { private CHAN_OF_INT in; private CHAN_OF_INT out_0; private CHAN_OF_INT out_1; //{{{ constructor Delta (CHAN_OF_INT in, CHAN_OF_INT out_0, CHAN_OF_INT out_1) { // setName ("Delta"); this.in = in; this.out_0 = out_0; this.out_1 = out_1; start (); } //}}} //{{{ run public void run () { // System.out.println (" " + getName() + " " + toString()); try { //{{{ main code in here while (true) // WHILE TRUE { // INT value: int value; // SEQ value = in.in (); // in ? value out_0.out (value); // out.0 ! value -- should be out_1.out (value); // out.1 ! value -- in PAR !! } //}}} } catch (InterruptedException caught) { System.out.println (" " + getName() + " method run exception"); } } //}}} } //}}} //{{{ PROC Succ (CHAN OF INT in, CHAN OF INT out) // yen.Teig 20.3.96 // modified Welch 8.5.96 class Succ extends Thread { private CHAN_OF_INT in; private CHAN_OF_INT out; //{{{ Succ Succ (CHAN_OF_INT in, CHAN_OF_INT out) { // setName ("Succ"); this.in = in; this.out = out; start (); } //}}} //{{{ run public void run () { // System.out.println (" " + getName() + " " + toString()); try { //{{{ main code in here while (true) // WHILE TRUE { // INT value: int value; // SEQ value = in.in (); // in ? value out.out (value + 1); // out ! value + 1 } //}}} } catch (InterruptedException caught) { System.out.println (" " + getName() + " method run exception"); } } //}}} } //}}} //{{{ PROC Consume (INT nLoops, CHAN OF INT in, out) // yenyvind Teig 20.3.96 // modified Welch 8.5.96 class Consume extends Thread { private CHAN_OF_INT in; private CHAN_OF_INT out; private int nLoops; //{{{ constructor Consume (int nLoops, CHAN_OF_INT in, CHAN_OF_INT out) { // setName ("Consume"); this.in = in; this.out = out; this.nLoops = nLoops; start (); } //}}} //{{{ run public void run () { // System.out.println (" " + getName() + " " + toString()); try { //{{{ main code in here //{{{ warm-up loop int warm_up = 16; // VAL INT warm.up IS 16: for (int i = 0; i < warm_up; i++) { // SEQ i = 0 FOR warm.up int value; // INT value: value = in.in (); // in ? value //{{{ COMMENT System.out.println (value); //System.out.println (value); //}}} } //}}} //{{{ Time tag Date date1 = new Date(); //}}} //{{{ bench-mark loop for (int i = 0; i < nLoops; i++) { // SEQ i = 0 FOR nLoops int value; // INT value: value = in.in (); // in ? value //{{{ COMMENT System.out.println (value); //System.out.println (value); //}}} } //}}} //{{{ Time tag Date date2 = new Date(); //}}} //{{{ Report long microseconds = ((date2.getTime() - date1.getTime()) * 1000); long timePerLoop_us = (microseconds / ((long) nLoops)); System.out.println (" " + timePerLoop_us + " microseconds / iteration"); //}}} //{{{ signal main thread that we're done out.out (0); //}}} //}}} } catch (InterruptedException caught) { System.out.println (" " + getName() + " method run exception"); } } //}}} } //}}} //{{{ main program thread // yenyvind Teig 20.3.96 // modified Welch 8.5.96 class ComsTime { //{{{ public static void main (String argv []) public static void main (String argv []) { //{{{ Banner System.out.println (""); System.out.println ("Test of communication between Java threads"); System.out.println ("Based on occam ComsTime.occ by Peter Welch, University of Kent at Canterbury"); System.out.println ("Ported into Java by Oyvind Teig"); System.out.println ("Now using CHAN_OF_INT (phw)"); System.out.println (""); //}}} //{{{ nLoops int nLoops = 1000; System.out.println (nLoops + " loops:"); //}}} //{{{ CHAN OF INT a, b, c, d, e: CHAN_OF_INT a = new CHAN_OF_INT (); CHAN_OF_INT b = new CHAN_OF_INT (); CHAN_OF_INT c = new CHAN_OF_INT (); CHAN_OF_INT d = new CHAN_OF_INT (); CHAN_OF_INT e = new CHAN_OF_INT (); //}}} //{{{ PAR (prefix, delta, succ, consume) // PAR Prefix prefix = new Prefix (0, b, a); // prefix (0, b, a) Delta delta = new Delta (a, c, d); // delta (a, c, d) Succ succ = new Succ (c, b); // succ (c, b) Consume consume = new Consume (nLoops, d, e); // consume (nLoops, d, e) //}}} //{{{ wait for the all done signal try { int done = e.in (); } catch (InterruptedException caught) { System.out.println (" " + "ComsTime" + " method run exception"); } //}}} //{{{ Stop threads prefix.stop (); delta.stop (); succ.stop (); consume.stop (); //}}} //{{{ join try { prefix.join (); delta.join (); succ.join (); consume.join (); } catch (InterruptedException caught) { System.out.println (" " + "ComsTime" + " final join interrupted"); } //}}} } //}}} } //}}} //}}} Performance times reported are around 5.5 milliseconds per cycle. //}}} //{{{ a blocking FIFO class for Java The following is an attempt to allow multiple readers/writers and cope with their blocking in a non-busy way. The attempt is only partially successful. //{{{ tiny problem of divergence There's something that looks suspiciously like a busy loop (see the fold marked "... wait till there is something/room"). The thread won't go round this loop a second time UNLESS it's beaten to the monitor by a competitive reader (if it was a reader) or a competitive writer (if it was a writer). It is sadly possible for a blocked reader to be overtaken (infinitely often) by readers that come along after it ... same for a writer. //}}} It's safe and non-busy for a single reader/writer though ... need to think some more on this ... //{{{ BUFFER_OF_INT.java import java.io.*; import java.util.*; import java.lang.*; public class BUFFER_OF_INT { //{{{ COMMENT documentation // //BUFFER_OF_INT implements a blocking FIFO buffer of integers. A fixed size //is defined upon initialisation. There can be any number of concurrent //readers and writers. Readers are blocked when the buffer is empty. Writers //are blocked when the buffer is full. A non-empty buffer will not refuse //a reader. A non-full buffer will not refuse a writer. // //The buffer should also be `fair'. // //CAUTION: above is the specification ... this implementation is not fair! //I think a reader may be overtaken by new readers and that this could happen //indefinitely ... same for writers ... needs fixing ... // //Meanwhile, this implementation should be safe for non-competetive readers //and writers -- e.g. one of each. // //}}} //{{{ local state int[] buffer; int max; int size = 0; // INVARIANT: (0 <= size <= max) int hi = 0; // INVARIANT: (0 <= hi < max) int lo = 0; // INVARIANT: (0 <= lo < max) int blocked_readers = 0; int blocked_writers = 0; //}}} //{{{ constructor BUFFER_OF_INT (int max) { //{{{ this.max = max; buffer = new int[max]; //}}} } //}}} //{{{ public synchronized void write (int n) throws InterruptedException { public synchronized void write (int n) throws InterruptedException { //{{{ //{{{ wait till there is room while (size == max) { blocked_writers++; wait (); } //}}} //{{{ update buffer buffer[hi] = n; hi = (hi + 1) % max; size++; //}}} //{{{ schedule any blocked reader (just one) if (blocked_readers > 0) { blocked_readers--; notify (); } //}}} //}}} } //}}} //{{{ public synchronized int read () throws InterruptedException { public synchronized int read () throws InterruptedException { //{{{ int temporary; //{{{ wait till there is something while (size == 0) { blocked_readers++; wait (); } //}}} //{{{ update buffer temporary = buffer[lo]; lo = (lo + 1) % max; size--; //}}} //{{{ schedule any blocked writer (just one) if (blocked_writers > 0) { blocked_writers--; notify (); } //}}} return temporary; //}}} } //}}} } //}}} This can be used in the ComsTime program simply by changing all "CHAN_OF_INT" to "BUFFER_OF_INT" and by changing the "in" method calls to "read" and the "out" method calls to "write". Performance times reported are again around 5.5 milliseconds per cycle. //}}} //{{{ discussion More thinking needs to be done ... discussion postponed! Except to say: o the occam code is much simpler and shorter; o the occam code executes ComsTime about 1000 times faster; o non-busy-looping Java code for secure fair multiple readers/writers needs finding (and is pretty hard to reason through correctly ...); o non-busy-looping occam code for secure fair multiple readers/writers is trivial (especially so if we implement occam3 SHARED channels). Too early for any conclusions to be drawn ... //}}} Peter Welch.