First cut at test.
[buftee] / tests / test.py
diff --git a/tests/test.py b/tests/test.py
new file mode 100755 (executable)
index 0000000..810123c
--- /dev/null
@@ -0,0 +1,84 @@
+#!/usr/bin/env python
+"""
+Test buftee functionality.
+"""
+__author__ = 'Emil Mikulic'
+
+import unittest
+import popen2
+import os
+import fcntl
+import signal
+import select
+
+buftee = './buftee'
+assert os.path.exists(buftee), 'can\'t find buftee (path was %s)' % repr(buftee)
+
+def wait_for_read(fd, max_wait_sec = 0.1):
+  r, _, _ = select.select([fd], [], [], max_wait_sec)
+  return r == [fd]
+
+def unblock(fd):
+  """Set a file descriptor to be non-blocking.  Don't readline() after this."""
+  flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+  fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
+
+class BufteeTestCase(unittest.TestCase):
+  def popen2(self, cmd, bufsize=-1):
+    #stdout, stdin, stderr = popen2.popen3(buftee)
+    self.child_stdout, self.child_stdin = popen2.popen2(cmd, bufsize)
+    unblock(self.child_stdout)
+
+  def write(self, buf):
+    signal.alarm(1)
+    ret = os.write(self.child_stdin.fileno(), buf)
+    signal.alarm(0)
+    self.assertEqual(ret, len(buf), 'short write!')
+
+  def read(self):
+    signal.alarm(1)
+    ret = self.child_stdout.read()
+    signal.alarm(0)
+    return ret
+
+  def assertRead(self, what):
+    self.assertTrue(wait_for_read(self.child_stdout))
+    self.assertEqual(what, self.read())
+
+  def assertReadEOF(self):
+    self.assertTrue(wait_for_read(self.child_stdout))
+    self.assertEqual('', self.read())
+
+  def close(self):
+    self.child_stdin.close()
+
+
+  def testWriteRead(self):
+    self.popen2(buftee)
+    msg = 'hello!\n'
+    self.write(msg)
+    self.assertRead(msg)
+    self.close()
+    self.assertReadEOF()
+
+  def testWriteCloseRead(self):
+    self.popen2(buftee)
+    msg = 'hello!\n'
+    self.write(msg)
+    self.close()
+    self.assertRead(msg)
+    self.assertReadEOF()
+
+  def testTwoWrites(self):
+    self.popen2(buftee)
+    msg1 = 'hello\n'
+    msg2 = 'world.\n'
+    self.write(msg1)
+    self.write(msg2)
+    self.close()
+    self.assertRead(msg1+msg2)
+    self.assertReadEOF()
+
+if __name__ == '__main__':
+  unittest.main()