根据man 7 pipe
The communication channel provided by a pipe is a byte stream: there is no concept of message boundaries.
但是POSIX定义了:
POSIX.1 says that writes of less than PIPE_BUF bytes must be atomic。
the output data is written to the pipe as a contiguous sequence
在Linux上PIPE_BUF是一个page,也就算和4096字节。
POSIX定义的含义就是说:小于PIPE_BUF的写入是一个原子操作,这有几个隐含的含义:
1 两个人分别写入aabb和ccdd,那么这两个消息不会interleave。
不会出现aaccbbdd,只能出现aabbccdd,或者ccddaabb
2 写入是一次性全部写入,或者一个也没有写入,意味着空间不足时,write和阻塞。
下面我们测试阻塞式管道的读写特征。
背景:pipe的默认缓存大小是16个page,也就是64k。
搞一个测试程序,循环写入,查看写入情况
void test_write_less_pipe_buf(int fd[2])
{
int done = 0;
while (1) {
int ret;
ret = write(fd[1], buf, 4000);
if (ret < 0) {
perror("write pipe failed");
break;
}
else if (ret == 0) {
printf("write return 0\n");
break;
}
done += ret;
printf("write %d bytes total %d\n", ret, done);
}
}测试发现,单次写入4000个字节,写入到64000的时候阻塞,此时剩余65536-64000=1536,不足以写入一个完整的4000字节。
./a.out write 4000 bytes total 4000 write 4000 bytes total 8000 write 4000 bytes total 12000 write 4000 bytes total 16000 write 4000 bytes total 20000 write 4000 bytes total 24000 write 4000 bytes total 28000 write 4000 bytes total 32000 write 4000 bytes total 36000 write 4000 bytes total 40000 write 4000 bytes total 44000 write 4000 bytes total 48000 write 4000 bytes total 52000 write 4000 bytes total 56000 write 4000 bytes total 60000 write 4000 bytes total 64000 ^C
改成一次写入8000,结果令人意外,8000也是要么全部写入,要么阻塞,没有返回部分数据。
$ ./a.out write 8000 bytes total 8000 write 8000 bytes total 16000 write 8000 bytes total 24000 write 8000 bytes total 32000 write 8000 bytes total 40000 write 8000 bytes total 48000 write 8000 bytes total 56000 write 8000 bytes total 64000 ^C
这回写一个通用程序,可以起最多2个写线程,最多2个读线程。
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
int buf[4][128 * 1024];
int size[4] = { 0 };
int fd[2];
void test_write(int i)
{
long done = 0;
int sz = size[i];
while (1) {
int ret = write(fd[1], buf[i], sz);
if (ret < 0) {
perror("wr pipe failed");
break;
}
else if (ret == 0) {
printf("wr return 0\n");
break;
}
if (ret != sz) {
fprintf(stderr, "partial write %d total %d\n", ret, done);
break;
}
done += ret;
printf("th %d (iosize %d) write %d bytes total %ld\n", i, sz, ret, done);
}
}
void test_read(int i)
{
long done = 0;
int sz = size[i];
while (1) {
int ret = read(fd[0], buf[i], sz);
if (ret < 0) {
perror("rd pipe failed");
break;
}
else if (ret == 0) {
printf("rd return 0\n");
break;
}
if (ret != sz) {
fprintf(stderr, "partial read %d total %ld\n", ret, done);
break;
}
if (memcmp(buf[0], buf[i], ret) && memcmp(buf[1], buf[i], ret)) {
fprintf(stderr, "interleave\n");
break;
}
done += ret;
printf("th %d (iosize %d) rd %d bytes total %ld\n", i, sz, ret, done);
}
}
void *fn(void *ctx)
{
int i = (long)ctx;
if (size[i] == 0)
return NULL;
if (i < 2)
test_write(i);
else
test_read(i);
}
int main(int argc, char **argv)
{
int ret;
long i;
for (i = 0; i < 4; i++)
memset(buf[i], i + 1, sizeof(buf[0]));
for (i = 1; i < argc && i < 5; i++)
size[i-1] = atoi(argv[i]);
ret = pipe(fd);
if (ret < 0)
perror("pipe failed");
for (i = 0; i < 4; i++) {
pthread_t tid;
ret = pthread_create(&tid, NULL, fn, (void *)i);
if (ret < 0)
perror("pthread_create() failed");
}
pause();
return 0;
}测试发现,
1 无论单次写入量小于还是大于PIPE_BUF,全部数据写入完成后才返回,不会返回部分数据。
不管怎么测试,partial write的打印都没触发过。
2 读完全是流式的,缓存里面有多少数据就会返回多少数据。
比如,写入4000,读取4200。那么第一次读取,可能就只读取到第一次写入的4000
$ ./a.out 4000 0 4200
th 0 (iosize 4000) write 4000 bytes total 4000
partial read 4000 total 0
3 如果写者的size 小于PIPE_BUF,读者的size等于写者,那么永远不会出现partial read
$ ./a.out 3123 0 3123 > /dev/null
执行这个命令,partial read的打印没有出现过。
4 第3种情况,就算有多个写者,读者也不会出现partial read,而且不会出现interleave。
此时看起来就不是流式了,而像message。这是POSIX的要求决定的特性。
写者是原子操作,而且不会interleave,这就是说buffer中永远有n个size大小的数据,而读者也是按
size去读,那么读到的就是一个个消息了。
5 如果写者写入的数据大小是变化的,读者按固定大小读,显然不能按消息读出来。
==============================================
6 如果写者的大小大于PIPE_BUF,读者的大小等于写着,那么会出现partial read
$ ./a.out 5000 0 5000 > /dev/null
partial read 904 total 100800000
partial read 4096 total 160565904
^C
写入到100MB数据的时候,出现了partial read,解释就是写者的写入动作被拆分了。
尽管要等全部数据写完才返回,但是再内部,写入是被拆分了的。
7 写者大于PIPE_BUF时,interleave轻松就发生了
$ ./a.out 5000 5000 5000 > /dev/null
interleave total 55000
^C