+#!/usr/bin/env python
+"""
+Test buftee functionality.
+"""
+__author__ = 'Emil Mikulic'
+
+import unittest
+import popen2
+import os
+import fcntl
+import signal
+import select
+
+buftee = './buftee'
+assert os.path.exists(buftee), 'can\'t find buftee (path was %s)' % repr(buftee)
+
+def wait_for_read(fd, max_wait_sec = 0.1):
+ r, _, _ = select.select([fd], [], [], max_wait_sec)
+ return r == [fd]
+
+def unblock(fd):
+ """Set a file descriptor to be non-blocking. Don't readline() after this."""
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+ fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
+
+class BufteeTestCase(unittest.TestCase):
+ def popen2(self, cmd, bufsize=-1):
+ #stdout, stdin, stderr = popen2.popen3(buftee)
+ self.child_stdout, self.child_stdin = popen2.popen2(cmd, bufsize)
+ unblock(self.child_stdout)
+
+ def write(self, buf):
+ signal.alarm(1)
+ ret = os.write(self.child_stdin.fileno(), buf)
+ signal.alarm(0)
+ self.assertEqual(ret, len(buf), 'short write!')
+
+ def read(self):
+ signal.alarm(1)
+ ret = self.child_stdout.read()
+ signal.alarm(0)
+ return ret
+
+ def assertRead(self, what):
+ self.assertTrue(wait_for_read(self.child_stdout))
+ self.assertEqual(what, self.read())
+
+ def assertReadEOF(self):
+ self.assertTrue(wait_for_read(self.child_stdout))
+ self.assertEqual('', self.read())
+
+ def close(self):
+ self.child_stdin.close()
+
+
+ def testWriteRead(self):
+ self.popen2(buftee)
+ msg = 'hello!\n'
+ self.write(msg)
+ self.assertRead(msg)
+ self.close()
+ self.assertReadEOF()
+
+ def testWriteCloseRead(self):
+ self.popen2(buftee)
+ msg = 'hello!\n'
+ self.write(msg)
+ self.close()
+ self.assertRead(msg)
+ self.assertReadEOF()
+
+ def testTwoWrites(self):
+ self.popen2(buftee)
+ msg1 = 'hello\n'
+ msg2 = 'world.\n'
+ self.write(msg1)
+ self.write(msg2)
+ self.close()
+ self.assertRead(msg1+msg2)
+ self.assertReadEOF()
+
+if __name__ == '__main__':
+ unittest.main()