#include "u.h"
#include "../port/lib.h"
#include "mem.h"
#include "dat.h"
#include "fns.h"
#include "../port/error.h"
typedef struct Pipe Pipe;
struct Pipe
{
QLock;
Pipe *next;
int ref;
ulong path;
Queue *q[2];
int qref[2];
};
struct
{
Lock;
ulong path;
} pipealloc;
enum
{
Qdir,
Qdata0,
Qdata1,
Qctl,
};
Dirtab pipedir[] =
{
".", {Qdir,0,QTDIR}, 0, DMDIR|0500,
"data", {Qdata0}, 0, 0600,
"data1", {Qdata1}, 0, 0600,
"ctl", {Qctl}, 0, 0600,
};
#define NPIPEDIR 4
#define PIPETYPE(x) (((unsigned)x)&0x1f)
#define PIPEID(x) ((((unsigned)x))>>5)
#define PIPEQID(i, t) ((((unsigned)i)<<5)|(t))
static void
pipeinit(void)
{
if(conf.pipeqsize == 0){
if(conf.nmach > 1)
conf.pipeqsize = 256*1024;
else
conf.pipeqsize = 32*1024;
}
}
/*
* create a pipe, no streams are created until an open
*/
static Chan*
pipeattach(char *spec)
{
Pipe *p;
Chan *c;
c = devattach('|', spec);
p = malloc(sizeof(Pipe));
if(p == 0)
exhausted("memory");
p->ref = 1;
p->q[0] = qopen(conf.pipeqsize, 0, 0, 0);
if(p->q[0] == 0){
free(p);
exhausted("memory");
}
p->q[1] = qopen(conf.pipeqsize, 0, 0, 0);
if(p->q[1] == 0){
free(p->q[0]);
free(p);
exhausted("memory");
}
lock(&pipealloc);
p->path = ++pipealloc.path;
unlock(&pipealloc);
mkqid(&c->qid, PIPEQID(3*p->path, Qdir), 0, QTDIR);
c->aux = p;
return c;
}
static int
pipegen(Chan *c, char*, Dirtab *tab, int ntab, int i, Dir *dp)
{
Qid q;
int len;
Pipe *p;
if(i == DEVDOTDOT){
devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp);
return 1;
}
i++; /* skip . */
if(tab==0 || i>=ntab)
return -1;
tab += i;
p = c->aux;
switch((ulong)tab->qid.path){
case Qdata0:
len = qlen(p->q[0]);
break;
case Qdata1:
len = qlen(p->q[1]);
break;
default:
len = tab->length;
break;
}
mkqid(&q, PIPEQID(PIPEID(c->qid.path), tab->qid.path), 0, QTFILE);
devdir(c, q, tab->name, len, eve, tab->perm, dp);
return 1;
}
static Walkqid*
pipewalk(Chan *c, Chan *nc, char **name, int nname)
{
Walkqid *wq;
Pipe *p;
wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
if(wq != nil && wq->clone != nil && wq->clone != c){
p = c->aux;
qlock(p);
p->ref++;
if(c->flag & COPEN){
print("channel open in pipewalk\n");
switch(PIPETYPE(c->qid.path)){
case Qdata0:
p->qref[0]++;
break;
case Qdata1:
p->qref[1]++;
break;
}
}
qunlock(p);
}
return wq;
}
static int
pipestat(Chan *c, uchar *db, int n)
{
Pipe *p;
Dir dir;
p = c->aux;
switch(PIPETYPE(c->qid.path)){
case Qdir:
devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
break;
case Qdata0:
devdir(c, c->qid, "data", qlen(p->q[0]), eve, 0600, &dir);
break;
case Qdata1:
devdir(c, c->qid, "data1", qlen(p->q[1]), eve, 0600, &dir);
break;
case Qctl:
devdir(c, c->qid, "ctl", 0, eve, 0600, &dir);
break;
default:
panic("pipestat");
}
n = convD2M(&dir, db, n);
if(n < BIT16SZ)
error(Eshortstat);
return n;
}
/*
* if the stream doesn't exist, create it
*/
static Chan*
pipeopen(Chan *c, int omode)
{
Pipe *p;
if(c->qid.type & QTDIR){
if(omode != OREAD)
error(Ebadarg);
c->mode = omode;
c->flag |= COPEN;
c->offset = 0;
return c;
}
p = c->aux;
qlock(p);
switch(PIPETYPE(c->qid.path)){
case Qdata0:
p->qref[0]++;
break;
case Qdata1:
p->qref[1]++;
break;
}
qunlock(p);
c->mode = openmode(omode);
c->flag |= COPEN;
c->offset = 0;
c->iounit = qiomaxatomic;
return c;
}
static void
pipeclose(Chan *c)
{
Pipe *p;
p = c->aux;
qlock(p);
if(c->flag & COPEN){
/*
* closing either side hangs up the stream
*/
switch(PIPETYPE(c->qid.path)){
case Qdata0:
p->qref[0]--;
if(p->qref[0] == 0){
qhangup(p->q[1], 0);
qclose(p->q[0]);
}
break;
case Qdata1:
p->qref[1]--;
if(p->qref[1] == 0){
qhangup(p->q[0], 0);
qclose(p->q[1]);
}
break;
}
}
/*
* if both sides are closed, they are reusable
*/
if(p->qref[0] == 0 && p->qref[1] == 0){
qreopen(p->q[0]);
qreopen(p->q[1]);
}
/*
* free the structure on last close
*/
p->ref--;
if(p->ref == 0){
qunlock(p);
free(p->q[0]);
free(p->q[1]);
free(p);
} else
qunlock(p);
}
static long
piperead(Chan *c, void *va, long n, vlong)
{
Pipe *p;
p = c->aux;
switch(PIPETYPE(c->qid.path)){
case Qdir:
return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
case Qdata0:
return qread(p->q[0], va, n);
case Qdata1:
return qread(p->q[1], va, n);
default:
panic("piperead");
}
return -1; /* not reached */
}
static Block*
pipebread(Chan *c, long n, ulong offset)
{
Pipe *p;
p = c->aux;
switch(PIPETYPE(c->qid.path)){
case Qdata0:
return qbread(p->q[0], n);
case Qdata1:
return qbread(p->q[1], n);
}
return devbread(c, n, offset);
}
static long
pipefastwrite(Chan *c, void *, long n) {
Pipe *p;
void *b;
int r = 0;
if (n < 0) {
panic("pipefastwrite len < 0");
}
//xchgw(&c->offset, c->offset + n);
c->offset += n;
/*
lock(c);
c->offset += n;
unlock(c);
*/
return n;
if (n > 1024)
n = 1024;
b = mallocz(1024, 1);
if(waserror()) {
/* avoid notes when pipe is a mounted queue */
if((c->flag & CMSG) == 0)
postnote(up, 1, "sys: write on closed pipe", NUser);
nexterror();
}
p = c->aux;
// print("pipe is %p, TYPE is %d\n", p,PIPETYPE(c->qid.path));
switch(PIPETYPE(c->qid.path)){
case Qdata0:
// print("writing to data0\n");
r = qwrite(p->q[1], b, n);
break;
case Qdata1:
// print("writing to data1\n");
r = qwrite(p->q[0], b, n);
break;
}
poperror();
return r;
}
/*
* a write to a closed pipe causes a note to be sent to
* the process.
*/
static long
pipewrite(Chan *c, void *va, long n, vlong)
{
Pipe *p;
char *s = nil;
/* cmd syscall fd pointer len */
char *tok[5];
Fastcall f;
int ntok;
Fastcall *nfc;
if(!islo())
print("pipewrite hi %#p\n", getcallerpc(&c));
if(waserror()) {
/* avoid notes when pipe is a mounted queue */
if((c->flag & CMSG) == 0)
postnote(up, 1, "sys: write on closed pipe", NUser);
nexterror();
}
p = c->aux;
switch(PIPETYPE(c->qid.path)){
case Qdata0:
n = qwrite(p->q[1], va, n);
break;
case Qdata1:
n = qwrite(p->q[0], va, n);
break;
case Qctl:
print("hey Mr. Ctl file\n");
s = malloc(n + 1);
if (waserror()) {
free(s);
nexterror();
}
print("malloc'd...");
memmove(s, va, n);
print("memmove'd...");
s[n] = 0;
ntok = tokenize(s, tok, nelem(tok));
print("tokenized!\n");
if (ntok == 5 && !strcmp(tok[0], "fastwrite")) {
Chan *fc;
char *cmd = tok[0];
int syscall = atoi(tok[1]);
int fd = atoi(tok[2]);
void *va = (void *)strtoul(tok[3], 0, 0);
int len = strtoul(tok[4], 0, 0);
print("command = %s, tokenized to (ntok = %d) cmd: %s, syscall: %d, fd: %p, va: %p, len: %d\n", s, ntok, cmd, syscall, fd, va, len);
print("gimme a fastwrite on pipe %p\n", p);
if (syscall < nsyscall)
error("syscall must be > nsyscall");
/* validate the address before the fd -- simpler error */
if (len < 0)
error("len < 0");
validaddr((unsigned long)va, len, 0);
print("validaddr\n");
fc = fdtochan(fd, OWRITE, 1, 1);
print("fc %p\n", fc);
if (waserror()) {
cclose(fc);
nexterror();
}
if (fc->aux != c->aux) {
error("fd is not for same pipe as ctl fd");
}
/* figure out which q it is, and inc the ref. */
f.scnum = syscall;
f.c = fc;
qlock(p);
switch(PIPETYPE(fc->qid.path)){
case 0:
p->qref[0]++;
break;
case 1:
p->qref[1]++;
break;
}
qunlock(p);
f.fun = pipefastwrite;
f.buf =va;
f.n = len;
nfc = malloc(sizeof(Fastcall)*(up->fcount + 1));
memmove(nfc, up->fc, sizeof(Fastcall)*up->fcount);
print("fcount = %d, giving scnum = %d, chan = %p, function = %p\n", up->fcount, f.scnum, f.c, f.fun);
nfc[up->fcount] = f;
up->fc = nfc;
up->fcount++;
print("fcount now %d\n", up->fcount);
poperror();
poperror();
poperror();
return n;
} else {
error("devpipe: bad command");
}
break;
default:
panic("pipewrite");
}
poperror();
return n;
}
static long
pipebwrite(Chan *c, Block *bp, ulong)
{
long n;
Pipe *p;
if(waserror()) {
/* avoid notes when pipe is a mounted queue */
if((c->flag & CMSG) == 0)
postnote(up, 1, "sys: write on closed pipe", NUser);
nexterror();
}
p = c->aux;
switch(PIPETYPE(c->qid.path)){
case Qdata0:
n = qbwrite(p->q[1], bp);
break;
case Qdata1:
n = qbwrite(p->q[0], bp);
break;
default:
n = 0;
panic("pipebwrite");
}
poperror();
return n;
}
Dev pipedevtab = {
'|',
"pipe",
devreset,
pipeinit,
devshutdown,
pipeattach,
pipewalk,
pipestat,
pipeopen,
devcreate,
pipeclose,
piperead,
pipebread,
pipewrite,
pipebwrite,
devremove,
devwstat,
};
|