3 Test buftee functionality.
5 __author__
= 'Emil Mikulic'
15 assert os
.path
.exists(buftee
), 'can\'t find buftee (path was %s)' % repr(buftee
)
17 def wait_for_read(fd
, max_wait_sec
= 0.1):
18 r
, _
, _
= select
.select([fd
], [], [], max_wait_sec
)
22 """Set a file descriptor to be non-blocking. Don't readline() after this."""
23 flags
= fcntl
.fcntl(fd
, fcntl
.F_GETFL
)
24 fcntl
.fcntl(fd
, fcntl
.F_SETFL
, flags | os
.O_NONBLOCK
)
27 class BufteeTestCase(unittest
.TestCase
):
28 def popen2(self
, cmd
, bufsize
=-1):
29 #stdout, stdin, stderr = popen2.popen3(buftee)
30 self
.child_stdout
, self
.child_stdin
= popen2
.popen2(cmd
, bufsize
)
31 unblock(self
.child_stdout
)
35 ret
= os
.write(self
.child_stdin
.fileno(), buf
)
37 self
.assertEqual(ret
, len(buf
), 'short write!')
41 ret
= self
.child_stdout
.read()
45 def assertRead(self
, what
):
46 self
.assertTrue(wait_for_read(self
.child_stdout
))
47 self
.assertEqual(what
, self
.read())
49 def assertReadEOF(self
):
50 self
.assertTrue(wait_for_read(self
.child_stdout
))
51 self
.assertEqual('', self
.read())
54 self
.child_stdin
.close()
57 def testWriteRead(self
):
65 def testWriteCloseRead(self
):
73 def testTwoWrites(self
):
80 self
.assertRead(msg1
+msg2
)
83 if __name__
== '__main__':