Plan 9 from Bell Labs’s /usr/web/sources/extra/9hist/port/stream.c

Copyright © 2021 Plan 9 Foundation.
Distributed under the MIT License.
Download the Plan 9 distribution.


## diffname port/stream.c 1990/0227
## diff -e /dev/null /n/bootesdump/1990/0227/sys/src/9/mips/stream.c
0a
#include	"u.h"
#include	"lib.h"
#include	"mem.h"
#include	"dat.h"
#include	"fns.h"
#include	"io.h"
#include	"errno.h"
#include	"devtab.h"

static void stputq(Queue*, Block*);
Qinfo procinfo = { stputq, nullput, 0, 0, "process" } ;
extern Qinfo noetherinfo;

static Qinfo *lds[] = {
	&noetherinfo,
	0
};

enum {
	Nclass=4,
};

/*
 *  All stream structures are ialloc'd at boot time
 */
Stream *slist;
Queue *qlist;
Block *blist;
static Lock garbagelock;

/*
 *  The block classes.  There are Nclass block sizes, each with its own free list.
 *  All are ialloced at qinit() time.
 */
typedef struct {
	int	size;
	Queue;
} Bclass;
Bclass bclass[Nclass]={
	{ 0 },
	{ 64 },
	{ 512 },
	{ 4096 },
};

/*
 *  Allocate streams, queues, and blocks.  Allocate n block classes with
 *	1/2(m+1) to class m < n-1
 *	1/2(n-1) to class n-1
 */
void
streaminit(void)
{
	int class, i, n;
	Block *bp;
	Bclass *bcp;

	slist = (Stream *)ialloc(conf.nstream * sizeof(Stream), 0);
	qlist = (Queue *)ialloc(conf.nqueue * sizeof(Queue), 0);
	blist = (Block *)ialloc(conf.nblock * sizeof(Block), 0);
	bp = blist;
	n = conf.nblock;
	for(class = 0; class < Nclass; class++){
		if(class < Nclass-1)
			n = n/2;
		bcp = &bclass[class];
		for(i = 0; i < n; i++) {
			if(bcp->size)
				bp->base = (uchar *)ialloc(bcp->size, 0);
			bp->lim = bp->base + bcp->size;
			bp->flags = class;
			freeb(bp);
			bp++;
		}
	}
}


/*
 *  allocate a block
 */
static int
isblock(void *arg)
{
	Bclass *bcp;

	bcp = (Bclass *)arg;
	return bcp->first!=0;
}
Block *
allocb(ulong size)
{
	Block *bp;
	Bclass *bcp;
	int i;


	/*
	 *  map size to class
	 */
	for(bcp=bclass; bcp->size<size && bcp<&bclass[Nclass-1]; bcp++)
		;

	/*
	 *  look for a free block, garbage collect if there are none
	 */
	lock(bcp);
	while(bcp->first == 0){
		unlock(bcp);
		print("waiting for blocks\n");
		sleep(&bcp->r, isblock, (void *)bcp);
		lock(bcp);
	}
	bp = bcp->first;
	bcp->first = bp->next;
	if(bcp->first == 0)
		bcp->last = 0;
	unlock(bcp);

	/*
	 *  return an empty block
	 */
	bp->rptr = bp->wptr = bp->base;
	bp->next = 0;
	bp->type = M_DATA;
	bp->flags &= S_CLASS;
	return bp;
}

/*
 *  Free a block.  Poison its pointers so that someone trying to access
 *  it after freeing will cause a dump.
 */
void
freeb(Block *bp)
{
	Bclass *bcp;

	bcp = &bclass[bp->flags & S_CLASS];
	bp->rptr = bp->wptr = 0;
	lock(bcp);
	if(bcp->first)
		bcp->last->next = bp;
	else
		bcp->first = bp;
	bcp->last = bp;
	bp->next = 0;
	wakeup(&bcp->r);
	unlock(bcp);
}

/*
 *  allocate a pair of queues.  flavor them with the requested put routines.
 *  the `QINUSE' flag on the read side is the only one used.
 */
static Queue *
allocq(Qinfo *qi)
{
	Queue *q, *wq;

	for(q=qlist; q<&qlist[conf.nqueue]; q++, q++) {
		if(q->flag == 0){
			if(canlock(q)){
				if(q->flag == 0)
					break;
				unlock(q);
			}
		}
	}

	if(q == &qlist[conf.nqueue]){
		print("no more queues\n");
		error(0, Enoqueue);
	}

	q->flag = QINUSE;
	q->r.p = 0;
	q->info = qi;
	q->put = qi->iput;
	wq = q->other = q + 1;

	wq->r.p = 0;
	wq->info = qi;
	wq->put = qi->oput;
	wq->other = q;

	unlock(q);

	return q;
}

/*
 *  free a queue
 */
static void
freeq(Queue *q)
{
	Block *bp;

	q = RD(q);
	while(bp = getq(q))
		freeb(bp);
	q = WR(q);
	while(bp = getq(q))
		freeb(bp);
	RD(q)->flag = 0;
}

/*
 *  push a queue onto a stream referenced by the proc side write q
 */
Queue *
pushq(Stream* s, Qinfo *qi)
{
	Queue *q;
	Queue *nq;

	q = RD(s->procq);

	/*
	 *  make the new queue
	 */
	nq = allocq(qi);

	/*
	 *  push
	 */
	RD(nq)->next = q;
	RD(WR(q)->next)->next = RD(nq);
	WR(nq)->next = WR(q)->next;
	WR(q)->next = WR(nq);

	if(qi->open)
		(*qi->open)(RD(nq), s);

	return WR(nq)->next;
}

/*
 *  pop off the top line discipline
 */
static void
popq(Stream *s)
{
	Queue *q;

	if(s->procq->next == WR(s->devq))
		error(0, Ebadld);
	q = s->procq->next;
	if(q->info->close)
		(*q->info->close)(RD(q));
	s->procq->next = q->next;
	RD(q->next)->next = RD(s->procq);
	freeq(q);
}

/*
 *  add a block (or list of blocks) to the end of a queue.  return true
 *  if one of the blocks contained a delimiter. 
 */
int
putq(Queue *q, Block *bp)
{
	int delim;

	delim = 0;
	lock(q);
	if(q->first)
		q->last->next = bp;
	else
		q->first = bp;
	q->len += bp->wptr - bp->rptr;
	delim = bp->flags & S_DELIM;
	while(bp->next) {
		bp = bp->next;
		q->len += bp->wptr - bp->rptr;
		delim |= bp->flags & S_DELIM;
	}
	q->last = bp;
	if(q->len >= Streamhi)
		q->flag |= QHIWAT;
	unlock(q);
	return delim;
}
int
putb(Blist *q, Block *bp)
{
	int delim;

	delim = 0;
	if(q->first)
		q->last->next = bp;
	else
		q->first = bp;
	q->len += bp->wptr - bp->rptr;
	delim = bp->flags & S_DELIM;
	while(bp->next) {
		bp = bp->next;
		q->len += bp->wptr - bp->rptr;
		delim |= bp->flags & S_DELIM;
	}
	q->last = bp;
	bp->next = 0;
	return delim;
}

/*
 *  add a block to the start of a queue 
 */
static void
putbq(Blist *q, Block *bp)
{
	lock(q);
	if(q->first)
		bp->next = q->first;
	else
		q->last = bp;
	q->first = bp;
	q->len += bp->wptr - bp->rptr;
	unlock(q);
}

/*
 *  remove the first block from a queue 
 */
Block *
getq(Queue *q)
{
	Block *bp;

	lock(q);
	bp = q->first;
	if(bp) {
		q->first = bp->next;
		if(q->first == 0)
			q->last = 0;
		q->len -= bp->wptr - bp->rptr;
		if((q->flag&QHIWAT) && q->len < Streamhi/2){
			wakeup(&q->other->next->other->r);
			q->flag &= ~QHIWAT;
		}
		bp->next = 0;
	}
	unlock(q);
	return bp;
}
Block *
getb(Blist *q)
{
	Block *bp;

	bp = q->first;
	if(bp) {
		q->first = bp->next;
		if(q->first == 0)
			q->last = 0;
		q->len -= bp->wptr - bp->rptr;
		bp->next = 0;
	}
	return bp;
}

/*
 *  put a block into the bit bucket
 */
void
nullput(Queue *q, Block *bp)
{
	freeb(bp);
	error(0, Ehungup);
}

/*
 *  find the info structure for line discipline 'name'
 */
static Qinfo *
qinfofind(char *name)
{
	Qinfo **qip;

	if(name == 0)
		error(0, Ebadld);
	for(qip = lds; *qip; qip++)
		if(strcmp((*qip)->name, name)==0)
			return *qip;
	error(0, Ebadld);
}

/*
 *  send a hangup up a stream
 */
static void
hangup(Stream *s)
{
	Block *bp;

	bp = allocb(0);
	bp->type = M_HANGUP;
	(*s->devq->put)(s->devq, bp);
}

/*
 *  parse a string and return a pointer to the second element if the 
 *  first matches name.  bp->rptr will be updated to point to the
 *  second element.
 *
 *  return 0 if no match.
 *
 *  it is assumed that the block data is null terminated.  streamwrite
 *  guarantees this.
 */
int
streamparse(char *name, Block *bp)
{
	int len;

	len = strlen(name);
	if(bp->wptr - bp->rptr < len)
		return 0;
	if(strncmp(name, (char *)bp->rptr, len)==0){
		if(bp->rptr[len] == ' ')
			bp->rptr += len+1;
		else if(bp->rptr[len])
			return 0;
		else
			bp->rptr += len;
		return 1;
	}
	return 0;
}

/*
 *  the per stream directory structure
 */
Dirtab streamdir[]={
	"data",		Sdataqid,	0,			0600,
	"ctl",		Sctlqid,	0,			0600,
};

/*
 *  A stream device consists of the contents of streamdir plus
 *  any directory supplied by the actual device.
 *
 *  values of s:
 * 	0 to ntab-1 apply to the auxiliary directory.
 *	ntab to ntab+Shighqid-Slowqid+1 apply to streamdir.
 */
int
streamgen(Chan *c, Dirtab *tab, int ntab, int s, Dir *dp)
{
	Proc *p;
	char buf[NAMELEN];

	if(s < ntab)
		tab = &tab[s];
	else if(s < ntab + Shighqid - Slowqid + 1)
		tab = &streamdir[s - ntab];
	else
		return -1;

	devdir(c, STREAMQID(STREAMID(c->qid),tab->qid), tab->name, tab->length,
		tab->perm, dp);
	return 1;
}

/*
 *  create a new stream
 */
Stream *
streamnew(Chan *c, Qinfo *qi)
{
	Stream *s;
	Queue *q;

	/*
	 *  find a free stream struct
	 */
	for(s = slist; s < &slist[conf.nstream]; s++) {
		if(s->inuse == 0){
			if(canlock(s)){
				if(s->inuse == 0)
					break;
				unlock(s);
			}
		}
	}
	if(s == &slist[conf.nstream]){
		print("no more streams\n");
		error(0, Enostream);
	}
	if(waserror()){
		unlock(s);
		streamclose(c);
		nexterror();
	}

	/*
	 *  marry a stream and a channel
	 */
	if(c){
		c->stream = s;
		s->type = c->type;
		s->dev = c->dev;
		s->id = STREAMID(c->qid);
	} else
		s->type = -1;

	/*
 	 *  hang a device and process q off the stream
	 */
	s->inuse = 1;
	s->tag[0] = 0;
	q = allocq(&procinfo);
	s->procq = WR(q);
	q = allocq(qi);
	s->devq = RD(q);
	WR(s->procq)->next = WR(s->devq);
	RD(s->procq)->next = 0;
	RD(s->devq)->next = RD(s->procq);
	WR(s->devq)->next = 0;

	if(qi->open)
		(*qi->open)(RD(s->devq), s);

	c->flag |= COPEN;
	unlock(s);
	poperror();
	return s;
}

/*
 *  (Re)open a stream.  If this is the first open, create a stream.
 */
void
streamopen(Chan *c, Qinfo *qi)
{
	Stream *s;
	Queue *q;

	/*
	 *  if the stream already exists, just up the reference count.
	 */
	for(s = slist; s < &slist[conf.nstream]; s++) {
		if(s->inuse && s->type == c->type && s->dev == c->dev
		   && s->id == STREAMID(c->qid)){
			lock(s);
			if(s->inuse && s->type == c->type
			&& s->dev == c->dev
		 	&& s->id == STREAMID(c->qid)){
				s->inuse++;
				c->stream = s;
				unlock(s);
				return;
			}
			unlock(s);
		}
	}

	/*
	 *  create a new stream
	 */
	streamnew(c, qi);
}

/*
 *  On the last close of a stream, for each queue on the
 *  stream release its blocks and call its close routine.
 */
void
streamclose(Chan *c)
{
	Queue *q, *nq;
	Block *bp;

	/*
	 *  if not open, ignore it
	 */
	if(!(c->flag & COPEN))
		return;

	/*
	 *  decrement the reference cound
	 */
	lock(c->stream);
	if(--(c->stream->inuse) != 0){
		unlock(c->stream);
		return;
	}

	/*
	 *  descend the stream closing the queues
	 */
	for(q = c->stream->procq; q; q = q->next){
		if(q->info->close)
			(*q->info->close)(q->other);
		if(q == c->stream->devq->other)
			break;
	}
	/*
	 *  ascend the stream freeing the queues
	 */
	for(q = c->stream->devq; q; q = nq){
		nq = q->next;
		freeq(q);
	}
	c->stream->id = c->stream->dev = c->stream->type = 0;
	unlock(c->stream);
}

/*
 *  put a block to be read into the queue.  wakeup any waiting reader
 */
void
stputq(Queue *q, Block *bp)
{
	int i;

	if(bp->type == M_HANGUP){
		freeb(bp);
		q->flag |= QHUNGUP;
		q->other->flag |= QHUNGUP;
	} else {
		lock(q);
		if(q->first)
			q->last->next = bp;
		else
			q->first = bp;
		q->last = bp;
		q->len += bp->wptr - bp->rptr;
		if(q->len >= Streamhi)
			q->flag |= QHIWAT;
		unlock(q);
	}
	wakeup(&q->r);
}

/*
 *  read a string.  update the offset accordingly.
 */
long
stringread(Chan *c, uchar *buf, long n, char *str)
{
	long i;

	i = strlen(str);
	i -= c->offset;
	if(i<n)
		n = i;
	if(n<0)
		return 0;
	memcpy(buf + c->offset, str, n);
	c->offset += n;
	return n;
}

/*
 *  return true if there is an output buffer available
 */
static int
isinput(void *x)
{
	return ((Queue *)x)->first != 0;
}

/*
 *  read until we fill the buffer or until a DELIM is encountered
 */
long
streamread(Chan *c, void *vbuf, long n)
{
	Block *bp;
	Stream *s;
	Queue *q;
	long rv = 0;
	int left, i, x;
	uchar *buf = vbuf;
	char num[32];

	s = c->stream;
	switch(STREAMTYPE(c->qid)){
	case Sdataqid:
		break;
	case Sctlqid:
		sprint(num, "%d", s->id);
		return stringread(c, buf, n, num);
	default:
		if(CHDIR & c->qid)
			return devdirread(c, vbuf, n, 0, 0, streamgen);
		else
			panic("streamread");
	}

	/*
	 *  one reader at a time
	 */
	qlock(&s->rdlock);
	if(waserror()){
		qunlock(&s->rdlock);
		nexterror();
	}

	/*
	 *  sleep till data is available
	 */
	q = RD(s->procq);
	left = n;
	while(left){
		bp = getq(q);
		if(bp == 0){
			if(q->flag & QHUNGUP)
				break;
			sleep(&q->r, &isinput, (void *)q);
			continue;
		}

		i = bp->wptr - bp->rptr;
		if(i <= left){
			memcpy(buf, bp->rptr, i);
			left -= i;
			buf += i;
			if(bp->flags & S_DELIM){
				freeb(bp);
				break;
			} else
				freeb(bp);
		} else {
			memcpy(buf, bp->rptr, left);
			bp->rptr += left;
			putbq(q, bp);
			left = 0;
		}
	};

	qunlock(&s->rdlock);
	poperror();
	return n - left;	
}

/*
 *  Handle a ctl request.  Streamwide requests are:
 *
 *	hangup			-- send an M_HANGUP up the stream
 *	push ldname		-- push the line discipline named ldname
 *	pop			-- pop a line discipline
 *
 *  This routing is entrered with s->wrlock'ed and must unlock.
 */
static long
streamctlwrite(Stream *s, void *a, long n)
{
	Qinfo *qi;
	Block *bp;

	/*
	 *  package
	 */
	bp = allocb(n+1);
	memcpy(bp->wptr, a, n);
	bp->wptr[n] = 0;
	bp->wptr += n + 1;

	/*
	 *  check for standard requests
	 */
	if(streamparse("hangup", bp)){
		hangup(s);
		freeb(bp);
	} else if(streamparse("push", bp)){
		qi = qinfofind((char *)bp->rptr);
		pushq(s, qi);
		freeb(bp);
	} else if(streamparse("pop", bp)){
		popq(s);
		freeb(bp);
	} else {
		bp->type = M_CTL;
		bp->flags |= S_DELIM;
		PUTNEXT(s->procq, bp);
	}

	return n;
}

/*
 *  wait till there's room in the next stream
 */
static int
notfull(void *arg)
{
	Queue *q;

	q = (Queue *)arg;
	return q->len < Streamhi;
}
void
flowctl(Queue *q)
{
	if(q->next->len >= Streamhi)
		sleep(&q->r, notfull, q->next);
}

/*
 *  send the request as a single delimited block
 */
long
streamwrite(Chan *c, void *a, long n)
{
	Stream *s;
	Block *bp;
	Queue *q;
	long rem;
	int i;

	/*
	 *  one writer at a time
	 */
	s = c->stream;
	qlock(&s->wrlock);
	if(waserror()){
		qunlock(&s->wrlock);
		nexterror();
	}

	/*
	 *  decode the qid
	 */
	switch(STREAMTYPE(c->qid)){
	case Sdataqid:
		break;
	case Sctlqid:
		n = streamctlwrite(s, a, n);
		qunlock(&s->wrlock);
		poperror();
		return n;
	default:
		panic("bad stream qid\n");
	}

	/*
	 *  No writes allowed on hungup channels
	 */
	q = s->procq;
	if(q->other->flag & QHUNGUP)
		error(0, Ehungup);

	if(GLOBAL(a) || n==0){
		/*
		 *  `a' is global to the whole system, just create a
		 *  pointer to it and pass it on.
		 */
		flowctl(q);
		bp = allocb(0);
		bp->rptr = bp->base = (uchar *)a;
		bp->wptr = bp->lim = (uchar *)a+n;
		bp->flags |= S_DELIM;
		bp->type = M_DATA;
		PUTNEXT(q, bp);
	} else {
		/*
		 *  `a' is in the user's address space, copy it into
		 *  system buffers and pass the buffers on.
		 */
		for(rem = n; ; rem -= i) {
			flowctl(q);
			bp = allocb(rem);
			i = bp->lim - bp->wptr;
			if(i >= rem){
				memcpy(bp->wptr, a, rem);
				bp->flags |= S_DELIM;
				bp->wptr += rem;
				bp->type = M_DATA;
				PUTNEXT(q, bp);
				break;
			} else {
				memcpy(bp->wptr, a, i);
				bp->wptr += i;
				bp->type = M_DATA;
				PUTNEXT(q, bp);
				a = ((char*)a) + i;
			}
		}
	}
	qunlock(&s->wrlock);
	poperror();
	return n;
}
.
## diffname port/stream.c 1990/03013
## diff -e /n/bootesdump/1990/0227/sys/src/9/mips/stream.c /n/bootesdump/1990/03013/sys/src/9/mips/stream.c
606,607c
	s->id = s->dev = s->type = 0;
	s->inuse--;
	unlock(s);
.
602c
	for(q = s->devq; q; q = nq){
.
596c
		if(q == s->devq->other)
.
593c
	for(q = s->procq; q; q = q->next){
.
584,585c
	lock(s);
	if(s->inuse != 1){
		s->inuse--;
.
573a
	Stream *s = c->stream;
.
## diffname port/stream.c 1990/0312
## diff -e /n/bootesdump/1990/03013/sys/src/9/mips/stream.c /n/bootesdump/1990/0312/sys/src/9/mips/stream.c
888a
}

/*
 *  like andrew's getmfields but no hidden state
 */
int
getfields(char *lp,	/* to be parsed */
	char **fields,	/* where to put pointers */
	int n,		/* number of pointers */
	char sep	/* separator */
)
{
	int i;

	for(i=0; lp && *lp && i<n; i++){
		while(*lp == sep)
			*lp++=0;
		if(*lp == 0)
			break;
		fields[i]=lp;
		while(*lp && *lp != sep)
			lp++;
	}
	return i;
.
849c
	if((GLOBAL(a) && !docopy) || n==0){
.
809c
streamwrite(Chan *c, void *a, long n, int docopy)
.
719c
		i = BLEN(bp);
.
654,655c
	memcpy(buf, str + c->offset, n);
.
632d
630a
		q->len += BLEN(bp);
		while(bp->next) {
			bp = bp->next;
			q->len += BLEN(bp);
		}
.
624a
		wakeup(&q->other->r);
.
418c
	if(BLEN(bp) < len)
.
363a
 *  make sure the first block has n bytes
 */
Block *
pullup(Block *bp, int n)
{
	Block *nbp;
	int i;

	/*
	 *  this should almost always be true, the rest it
	 *  just for to avoid every caller checking.
	 */
	if(BLEN(bp) >= n)
		return bp;

	/*
	 *  if not enough room in the first block,
	 *  add another to the front of the list.
	if(bp->lim - bp->rptr < n){
		nbp = allocb(n);
		nbp->next = bp;
		bp = nbp;
	}

	/*
	 *  copy bytes from the trailing blocks into the first
	 */
	n -= BLEN(bp);
	while(nbp = bp->next){
		i = BLEN(nbp);
		if(i > n) {
			memcpy(bp->wptr, nbp->rptr, n);
			bp->wptr += n;
			nbp->rptr += n;
			return bp;
		} else {
			memcpy(bp->wptr, nbp->rptr, i);
			bp->wptr += i;
			bp->next = nbp->next;
			nbp->next = 0;
			freeb(nbp);
		}
	}
	freeb(bp);
	return 0;
}

/*
 *  grow the front of a list of blocks by n bytes
 */
Block *
prepend(Block *bp, int n)
{
	Block *nbp;

	if(bp->base && (bp->rptr - bp->base)>=n){
		/*
		 *  room for channel number in first block of message
		 */
		bp->rptr -= n;
		return bp;
	} else {
		/*
		 *  make new block, put message number at end
		 */
		nbp = allocb(2);
		nbp->next = bp;
		nbp->wptr = nbp->lim;
		nbp->rptr = nbp->wptr - n;
		return nbp;
	}
}

/*
.
357c
		q->len -= BLEN(bp);
.
346a

/*
 *  remove the first block from a list of blocks
 */
.
337c
		q->len -= BLEN(bp);
.
324c
 *  remove the first block from a queue
.
319c
	q->len += BLEN(bp);
.
310c
void
.
303d
299c
		q->len += BLEN(bp);
.
295c
	q->len += BLEN(bp);
.
276c
		q->len += BLEN(bp);
.
272c
	q->len += BLEN(bp);
.
149a
	wakeup(&bcp->r);
.
147,148d
145a
	tries = 0;
	while(bp->next){
		if(++tries > 10){
			dumpstack();
			panic("freeb");
		}
		bp = bp->next;
	}
.
141a
	bp->rptr = bp->wptr = 0;
.
140d
137a
	int tries;
.
131,132c
 *  Free a block (or list of blocks).  Poison its pointers so that
 *  someone trying to access it after freeing will cause a dump.
.
111a
		qunlock(bcp);
.
110a
		qlock(bcp);
.
37c
	Blist;
	QLock;		/* qlock for sleepers on r */
	Rendez	r;	/* sleep here waiting for blocks */
.
15a
	&dkmuxinfo,
	&urpinfo,
.
13a
/*
 *  line disciplines that can be pushed
 *
 *  WARNING: this table should be the result of configuration
 */
extern Qinfo noetherinfo;
extern Qinfo dkmuxinfo;
extern Qinfo urpinfo;
.
11,12c
Qinfo procinfo = { stputq, nullput, 0, 0, "process" };
.
9a
/*
 *  process end line discipline
 */
.
## diffname port/stream.c 1990/0321
## diff -e /n/bootesdump/1990/0312/sys/src/9/mips/stream.c /n/bootesdump/1990/0321/sys/src/9/mips/stream.c
743c
	if(delim)
		wakeup(&q->r);
.
740a
			delim = 1;
		}
.
739c
		if(q->len >= Streamhi){
.
736a
			delim |= bp->flags & S_DELIM;
.
733a
		delim = bp->flags & S_DELIM;
.
727a
		delim = 0;
.
726a
		delim = 1;
.
720c
	int delim;
.
173c
	if(bcp->r.p)
		wakeup(&bcp->r);
.
92d
55,56c
	{ 68 },
	{ 260 },
.
34a
	for(i=0; i<Nlds && lds[i]; i++)
		if(lds[i] == qi)
			return;
	if(i == Nlds)
		panic("pushable");
	lds[i] = qi;
}

.
31,33c
void
newqinfo(Qinfo *qi)
{
	int i;
.
21,29c
static Qinfo *lds[Nlds+1];
.
18,19d
9a
enum {
	Nclass=4,	/* number of block classes */
	Nlds=32,	/* max number of pushable line disciplines */
};

.
## diffname port/stream.c 1990/0322
## diff -e /n/bootesdump/1990/0321/sys/src/9/mips/stream.c /n/bootesdump/1990/0322/sys/src/9/mips/stream.c
127,130c
		print("waiting for block %d\n", size);
		if(loop++ > 10)
			panic("waiting for blocks");
		qlock(&bcp->q);
		tsleep(&bcp->r, isblock, (void *)bcp, 250);
		qunlock(&bcp->q);
.
114d
112c
	int loop=0;
.
54c
	QLock	q;	/* qlock for sleepers on r */
.
## diffname port/stream.c 1990/03292
## diff -e /n/bootesdump/1990/0322/sys/src/9/mips/stream.c /n/bootesdump/1990/03292/sys/src/9/mips/stream.c
685c
	if(!c->stream)
.
631d
131c
		qunlock(bcp);
.
128,129c
			panic("waiting for blocks\n");
		qlock(bcp);
.
126d
54c
	QLock;		/* qlock for sleepers on r */
.
## diffname port/stream.c 1990/0331
## diff -e /n/bootesdump/1990/03292/sys/src/9/mips/stream.c /n/bootesdump/1990/0331/sys/src/9/mips/stream.c
828,829c
			if(q->flag & QHUNGUP){
				if(s->hread++ < 3)
					break;
				else
					error(0, Ehungup);
			}
.
616a
	s->hread = 0;
.
127a
		}
.
126c
		if(loop++ > 10){
			dumpqueues();
			dumpstack();
.
64a
 *  Dump all block information of how many blocks are in which queues
 */
void
dumpqueues(void)
{
	Queue *q;
	int count;
	Block *bp;

	for(q = qlist; q < qlist + conf.nqueue; q++, q++){
		if(!(q->flag & QINUSE))
			continue;
		for(count = 0, bp = q->first; bp; bp = bp->next)
			count++;
		print("%s %ux  RD count %d len %d", q->info->name, q, count, q->len);
		for(count = 0, bp = WR(q)->first; bp; bp = bp->next)
			count++;
		print("  WR count %d len %d\n", count, WR(q)->len);
	}
}

/*
.
## diffname port/stream.c 1990/0403
## diff -e /n/bootesdump/1990/0331/sys/src/9/mips/stream.c /n/bootesdump/1990/0403/sys/src/9/mips/stream.c
1012c
			FLOWCTL(q);
.
999c
		FLOWCTL(q);
.
946,947c
	sleep(&q->r, notfull, q->next);
.
941c
	return !QFULL(q->next);
.
772c
		if(q->len >= Streamhi || q->nb >= Streambhi){
.
768a
			q->nb++;
.
764a
		q->nb++;
.
390c
		q->nb--;
		if((q->flag&QHIWAT) && q->len < Streamhi/2 && q->nb < Streambhi){
.
371a
	q->nb++;
.
333c
	if(q->len >= Streamhi || q->nb >= Streambhi)
.
329a
		q->nb++;
.
325a
	q->nb++;
.
238a
	wq->len = wq->nb = 0;
.
232a
	q->len = q->nb = 0;
.
150,151c
			print("waiting for blocks\n");
.
148c
		if(loop++ == 10){
.
61c
	{ 1024 },
.
## diffname port/stream.c 1990/0406
## diff -e /n/bootesdump/1990/0403/sys/src/9/mips/stream.c /n/bootesdump/1990/0406/sys/src/9/mips/stream.c
945,948c
	return !QFULL((Queue *)arg);
.
395c
		if((q->flag&QHIWAT) && q->len<Streamhi/2 && q->nb<Streambhi/2){
.
## diffname port/stream.c 1990/0409
## diff -e /n/bootesdump/1990/0406/sys/src/9/mips/stream.c /n/bootesdump/1990/0409/sys/src/9/mips/stream.c
61c
	{ 4096 },
.
## diffname port/stream.c 1990/0509
## diff -e /n/bootesdump/1990/0409/sys/src/9/mips/stream.c /n/bootesdump/1990/0509/sys/src/9/mips/stream.c
83a
	print("\n");
	for(bcp=bclass; bcp<&bclass[Nclass-1]; bcp++){
		lock(bcp);
		for(count = 0, bp = bcp->first; bp; count++, bp = bp->next)
			;
		unlock(bcp);
		print("%d blocks of size %d\n", count, bcp->size);
	}
	print("\n");
.
73a
	print("\n");
.
72a
	Bclass *bcp;
.
## diffname port/stream.c 1990/0511
## diff -e /n/bootesdump/1990/0509/sys/src/9/mips/stream.c /n/bootesdump/1990/0511/sys/src/9/mips/stream.c
87c
	for(bcp=bclass; bcp<&bclass[Nclass]; bcp++){
.
84c
		print("  W c %d l %d f %ux\n", count, WR(q)->len, WR(q)->flag);
.
81c
		print("%s %ux  R c %d l %d f %ux", q->info->name, q, count,
			q->len, q->flag);
.
## diffname port/stream.c 1990/0513
## diff -e /n/bootesdump/1990/0511/sys/src/9/mips/stream.c /n/bootesdump/1990/0513/sys/src/9/mips/stream.c
194a
	if((bp->flags&S_CLASS) >= Nclass)
		panic("freeb class");
.
## diffname port/stream.c 1990/0629
## diff -e /n/bootesdump/1990/0513/sys/src/9/mips/stream.c /n/bootesdump/1990/0629/sys/src/9/mips/stream.c
744,758c
	streamexit(s, 1);
.
742c
	 *  leave it and free it
.
739a
	s->opens--;
.
735,738c
	if(s->opens == 1){
		/*
		 *  descend the stream closing the queues
		 */
		for(q = s->procq; q; q = q->next){
			if(q->info->close)
				(*q->info->close)(q->other);
			/* this may be 2 streams joined device end to device end */
			if(q == s->devq->other)
				break;
		}
	
		/*
		 *  ascend the stream flushing the queues
		 */
		for(q = s->devq; q; q = nq){
			nq = q->next;
			flushq(q);
		}
.
732c
	 *  decrement the reference count
.
714a
 *  Enter a stream.  Increment the reference count so it can't disappear
 *  under foot.
 */
int
streamenter(Stream *s)
{
	lock(s);
	if(s->opens == 0){
		unlock(s);
		return -1;
	}
	s->inuse++;
	unlock(s);
	return 0;
}

/*
 *  Decrement the reference count on a stream.  If the count is
 *  zero, free the stream.
 */
void
streamexit(Stream *s, int locked)
{
	Queue *q;
	Queue *nq;

	if(!locked)
		lock(s);
	if(s->inuse == 1){
		/*
		 *  ascend the stream freeing the queues
		 */
		for(q = s->devq; q; q = nq){
			nq = q->next;
			freeq(q);
		}
		s->id = s->dev = s->type = 0;
	}
	s->inuse--;
	if(!locked)
		unlock(s);
}

/*
.
699a
				s->opens++;
.
662d
660a
	s->opens = 1;
.
518,519c
	if(bp->type == M_HANGUP)
		freeb(bp);
	else {
		freeb(bp);
		error(0, Ehungup);
	}
.
260a
 *  flush a queue
 */
static void
flushq(Queue *q)
{
	Block *bp;

	q = RD(q);
	while(bp = getq(q))
		freeb(bp);
	q = WR(q);
	while(bp = getq(q))
		freeb(bp);
}

/*
.
## diffname port/stream.c 1990/0702
## diff -e /n/bootesdump/1990/0629/sys/src/9/mips/stream.c /n/bootesdump/1990/0702/sys/src/9/mips/stream.c
248a
	wq->flag = QINUSE;
.
## diffname port/stream.c 1990/0707
## diff -e /n/bootesdump/1990/0702/sys/src/9/mips/stream.c /n/bootesdump/1990/0707/sys/src/9/mips/stream.c
79,85c
		print("%s %ux  R n %d l %d f %ux r %ux", q->info->name, q, q->nb,
			q->len, q->flag, &(q->r));
		print("  W n %d l %d f %ux r %ux\n", WR(q)->nb, WR(q)->len, WR(q)->flag,
			&(WR(q)->r));
		dumpblocks(q, 'R');
		dumpblocks(WR(q), 'W');
.
67a
dumpblocks(Queue *q, char c)
{
	Block *bp;
	uchar *cp;

	lock(q);
	for(bp = q->first; bp; bp = bp->next){
		print("%c%d%c", c, bp->wptr-bp->rptr, (bp->flags&S_DELIM)?'D':' ');
		for(cp = bp->rptr; cp<bp->wptr && cp<bp->rptr+10; cp++)
			print(" %uo", *cp);
		print("\n");
	}
	unlock(q);
}

void
.
## diffname port/stream.c 1990/0801
## diff -e /n/bootesdump/1990/0707/sys/src/9/mips/stream.c /n/bootesdump/1990/0801/sys/src/9/mips/stream.c
1157a
}

/*
 *  stat a stream.  the length is the number of bytes up to the
 *  first delimiter.
 */
void
streamstat(Chan *c, char *db, char *name)
{
	Dir dir;
	Stream *s;
	Queue *q;
	Block *bp;
	long n;

	s = c->stream;
	if(s == 0)
		panic("streamstat");

	q = RD(s->procq);
	lock(q);
	for(n=0, bp=q->first; bp; bp = bp->next){
		n += BLEN(bp);
		if(bp->flags&S_DELIM)
			break;
	}
	unlock(q);

	devdir(c, c->qid, name, n, 0, &dir);
	convD2M(&dir, db);
.
75c
		print("%c%d%c", c, bp->wptr-bp->rptr, (bp->flags&S_DELIM));
.
8a
#include	"fcall.h"
.
## diffname port/stream.c 1990/08272
## diff -e /n/bootesdump/1990/0801/sys/src/9/mips/stream.c /n/bootesdump/1990/08272/sys/src/9/mips/stream.c
137c
				bp->base = (uchar *)ialloc(bcp->size, i == 0);
.
135a
			/*
			 *  The i == 0 means that each allocation range
			 *  starts on a page boundary.  This makes sure
			 *  no block crosses a page boundary.
			 */
.
60,61c
	{ 64 },
	{ 256 },
.
50a
 *
 *  NOTE: to help the mappings on the IO2 and IO3 boards, the data pointed
 *	  to by a block must not cross a 4k boundary.  Therefore:
 *	  1) all the following block sizes divide evenly into 4k
 *	  2) all the blocks are ialloc'd to not cross 4k boundaries
.
## diffname port/stream.c 1990/0905
## diff -e /n/bootesdump/1990/08272/sys/src/9/mips/stream.c /n/bootesdump/1990/0905/sys/src/9/mips/stream.c
1199a

/*
 *  announce a line discipline that can be pushed
 */
void
newqinfo(Qinfo *qi)
{
	int i;

	for(i=0; i<Nlds && lds[i]; i++)
		if(lds[i] == qi)
			return;
	if(i == Nlds)
		panic("pushable");
	lds[i] = qi;
}

.
629,636d
482,529d
146,147c
			if(bcp->size){
				if(bcp->size > left){
					left = bcp->size>4096 ? bcp->size : 4096;
					ptr = (uchar *)ialloc(left, 1);
				}
				bp->base = ptr;
				ptr += bcp->size;
				left -= bcp->size;
			}
.
135a
	left = 0;
.
129a
	uchar *ptr;
.
127c
	int class, i, n, left;
.
122a
 *
 *  All data areas are alligned to their size.
 *
 *  No data area crosses a 4k boundary.  This allows us to use the
 *  VME/SCSI/LANCE to MP bus maps on the SGI power series machines.
.
70a
 *  the per stream directory structure
 */
Dirtab streamdir[]={
	"data",		Sdataqid,	0,			0600,
	"ctl",		Sctlqid,	0,			0600,
};

/*
.
27,39d
## diffname port/stream.c 1990/09051
## diff -e /n/bootesdump/1990/0905/sys/src/9/mips/stream.c /n/bootesdump/1990/09051/sys/src/9/mips/stream.c
53a
	{ 2048 },
.
12c
	Nclass=5,	/* number of block classes */
.
## diffname port/stream.c 1990/0907
## diff -e /n/bootesdump/1990/09051/sys/src/9/mips/stream.c /n/bootesdump/1990/0907/sys/src/9/mips/stream.c
1154,1170d
590a
 *  the per stream directory structure
 */
Dirtab streamdir[]={
	"data",		Sdataqid,	0,			0600,
	"ctl",		Sctlqid,	0,			0600,
};

/*
.
491a
 *  make sure the first block has n bytes
 */
Block *
pullup(Block *bp, int n)
{
	Block *nbp;
	int i;

	/*
	 *  this should almost always be true, the rest it
	 *  just for to avoid every caller checking.
	 */
	if(BLEN(bp) >= n)
		return bp;

	/*
	 *  if not enough room in the first block,
	 *  add another to the front of the list.
	if(bp->lim - bp->rptr < n){
		nbp = allocb(n);
		nbp->next = bp;
		bp = nbp;
	}

	/*
	 *  copy bytes from the trailing blocks into the first
	 */
	n -= BLEN(bp);
	while(nbp = bp->next){
		i = BLEN(nbp);
		if(i > n) {
			memcpy(bp->wptr, nbp->rptr, n);
			bp->wptr += n;
			nbp->rptr += n;
			return bp;
		} else {
			memcpy(bp->wptr, nbp->rptr, i);
			bp->wptr += i;
			bp->next = nbp->next;
			nbp->next = 0;
			freeb(nbp);
		}
	}
	freeb(bp);
	return 0;
}

/*
.
149,157c
			if(bcp->size)
				bp->base = (uchar *)ialloc(bcp->size, i == 0);
.
138d
131d
128c
	int class, i, n;
.
119,123d
59,66d
54d
26a
void
newqinfo(Qinfo *qi)
{
	int i;

	for(i=0; i<Nlds && lds[i]; i++)
		if(lds[i] == qi)
			return;
	if(i == Nlds)
		panic("pushable");
	lds[i] = qi;
}

.
12c
	Nclass=4,	/* number of block classes */
.
## diffname port/stream.c 1990/0911
## diff -e /n/bootesdump/1990/0907/sys/src/9/mips/stream.c /n/bootesdump/1990/0911/sys/src/9/mips/stream.c
1198a
}

/*
 *  Dump all block information of how many blocks are in which queues
 */
void
dumpblocks(Queue *q, char c)
{
	Block *bp;
	uchar *cp;

	lock(q);
	for(bp = q->first; bp; bp = bp->next){
		print("%c%d%c", c, bp->wptr-bp->rptr, (bp->flags&S_DELIM));
		for(cp = bp->rptr; cp<bp->wptr && cp<bp->rptr+10; cp++)
			print(" %uo", *cp);
		print("\n");
	}
	unlock(q);
}

void
dumpqueues(void)
{
	Queue *q;
	int count;
	Block *bp;
	Bclass *bcp;

	print("\n");
	for(q = qlist; q < qlist + conf.nqueue; q++, q++){
		if(!(q->flag & QINUSE))
			continue;
		print("%s %ux  R n %d l %d f %ux r %ux", q->info->name, q, q->nb,
			q->len, q->flag, &(q->r));
		print("  W n %d l %d f %ux r %ux\n", WR(q)->nb, WR(q)->len, WR(q)->flag,
			&(WR(q)->r));
		dumpblocks(q, 'R');
		dumpblocks(WR(q), 'W');
	}
	print("\n");
	for(bcp=bclass; bcp<&bclass[Nclass]; bcp++){
		lock(bcp);
		for(count = 0, bp = bcp->first; bp; count++, bp = bp->next)
			;
		unlock(bcp);
		print("%d blocks of size %d\n", count, bcp->size);
	}
	print("\n");
.
1142,1143c
out:
/*	qunlock(&s->wrlock);
	poperror(); /**/
.
1091,1093c
		goto out;
.
1081a
	 */
.
1075,1076d
1072a
	s = c->stream;

.
579,581c
	for(qi = lds; qi; qi = qi->next)
		if(strcmp(qi->name, name)==0)
			return qi;
.
575c
	Qinfo *qi;
.
244a
 *  pad a block to the front with n bytes
 */
Block *
padb(Block *bp, int n)
{
	Block *nbp;

	if(bp->base && bp->rptr-bp->base>=n){
		bp->rptr -= n;
		return bp;
	} else {
		nbp = allocb(n);
		nbp->wptr = nbp->lim;
		nbp->rptr = nbp->wptr - n;
		nbp->next = bp;
		return nbp;
	}
} 

/*
.
186,189d
172d
156a
 *  make known a stream module and call its initialization routine, if
 *  it has one.
 */
void
newqinfo(Qinfo *qi)
{
	qi->next = lds;
	lds = qi;
	if(qi->reset)
		(*qi->reset)();
}

/*
.
153a

	/*
	 *  make stream modules available
	 */
	streaminit0();
.
147c
				bp->base = (uchar *)ialloc(bcp->size, 0);
.
141,145d
130a
	/*
	 *  allocate blocks, queues, and streams
	 */
.
79,118d
70,77c
#include "stream.h"
.
65,66c
	{ 68 },
	{ 260 },
.
51,55d
27,39d
25c
static Qinfo *lds;
.
13d
## diffname port/stream.c 1990/0914
## diff -e /n/bootesdump/1990/0911/sys/src/9/mips/stream.c /n/bootesdump/1990/0914/sys/src/9/mips/stream.c
1160d
1151,1158c
		n = 0;
	else {
		q = RD(s->procq);
		lock(q);
		for(n=0, bp=q->first; bp; bp = bp->next){
			n += BLEN(bp);
			if(bp->flags&S_DELIM)
				break;
		}
		unlock(q);
.
## diffname port/stream.c 1990/0930
## diff -e /n/bootesdump/1990/0914/sys/src/9/mips/stream.c /n/bootesdump/1990/0930/sys/src/9/mips/stream.c
1106,1108d
1069c
	if(!docopy && GLOBAL(a)){
.
1052,1060c
	if(STREAMTYPE(c->qid) != Sdataqid)
		return streamctlwrite(c, a, n);
.
1041,1049d
981a
	if(STREAMTYPE(c->qid) != Sctlqid)
		panic("streamctlwrite %lux", c->qid);
	s = c->stream;

.
980a
	Stream *s;
.
977c
streamctlwrite(Chan *c, void *a, long n)
.
919a
	s = c->stream;
.
903,915c
	if(STREAMTYPE(c->qid) != Sdataqid)
		return streamctlread(c, vbuf, n);
.
901d
898,899c
	int left, i;
.
886c
	Queue *q;

	q = (Queue *)x;
	return (q->flag&QHUNGUP) || q->first!=0;
.
880a
 *  return the stream id
 */
long
streamctlread(Chan *c, void *vbuf, long n)
{
	uchar *buf = vbuf;
	char num[32];
	Stream *s;

	s = c->stream;
	if(STREAMTYPE(c->qid) == Sctlqid){
		sprint(num, "%d", s->id);
		return stringread(c, buf, n, num);
	} else {
		if(CHDIR & c->qid)
			return devdirread(c, vbuf, n, 0, 0, streamgen);
		else
			panic("streamctlread");
	}
}

/*
.
130c
	 *  look for a free block
.
## diffname port/stream.c 1990/1009
## diff -e /n/bootesdump/1990/0930/sys/src/9/mips/stream.c /n/bootesdump/1990/1009/sys/src/9/mips/stream.c
1177c
		print("%c%d%c", c, bp->wptr-bp->rptr, (bp->flags&S_DELIM)?'D':' ');
.
819a
void
streamclose(Chan *c)
{
	/*
	 *  if no stream, ignore it
	 */
	if(!c->stream)
		return;
	streamclose1(c->stream);
}
.
783,788d
780d
776c
streamclose1(Stream *s)
.
724c
	c->stream = streamnew(c->type, c->dev, STREAMID(c->qid), qi, 0);
.
702c
	 *  if the stream already exists, just increment the reference counts.
.
673c
	if(noopen)
		s->opens = 0;
	else
		s->opens = 1;
.
661,667c
	s->type = type;
	s->dev = dev;
	s->id = id;
.
659c
	 *  identify the stream
.
654c
		streamclose1(s);
.
631c
streamnew(ushort type, ushort dev, ushort id, Qinfo *qi, int noopen)
.
628c
 *  create a new stream, if noopen is non-zero, don't increment the open count
.
244a
	wq->ptr = 0;
.
237a
	q->ptr = 0;
.
## diffname port/stream.c 1990/1011
## diff -e /n/bootesdump/1990/1009/sys/src/9/mips/stream.c /n/bootesdump/1990/1011/sys/src/9/mips/stream.c
787,795c
		if(!waserror()){
			/*
			 *  descend the stream closing the queues
			 */
			for(q = s->procq; q; q = q->next){
				if(q->info->close)
					(*q->info->close)(q->other);
				/*
				 *  this may be 2 streams joined device end to device end
				 */
				if(q == s->devq->other)
					break;
			}
			poperror();
.
## diffname port/stream.c 1990/1018
## diff -e /n/bootesdump/1990/1011/sys/src/9/mips/stream.c /n/bootesdump/1990/1018/sys/src/9/mips/stream.c
1050a
	qunlock(&q->rlock);
.
1049a
	qlock(&q->rlock);
.
## diffname port/stream.c 1990/1101
## diff -e /n/bootesdump/1990/1018/sys/src/9/mips/stream.c /n/bootesdump/1990/1101/sys/src/9/mips/stream.c
793a
				WR(q)->put = nullput;

.
769a
	return rv;
.
767a
	rv = s->inuse;
.
753a
	int rv;
.
749c
int
.
## diffname port/stream.c 1990/1104
## diff -e /n/bootesdump/1990/1101/sys/src/9/mips/stream.c /n/bootesdump/1990/1104/sys/src/9/mips/stream.c
758a
		if(s->opens != 0)
			print("streamexit %d %s\n", s->opens, s->devq->info->name);

.
754a
	char *name;
.
## diffname port/stream.c 1990/1113
## diff -e /n/bootesdump/1990/1104/sys/src/9/mips/stream.c /n/bootesdump/1990/1113/sys/src/9/mips/stream.c
1061a
	poperror();
.
1059a
	if(waserror()){
		qunlock(&q->rlock);
		nexterror();
	}
.
137a
		poperror();
.
135a
		if(waserror()){
			qunlock(bcp);
			nexterror();
		}
.
## diffname port/stream.c 1990/11151
## diff -e /n/bootesdump/1990/1113/sys/src/9/mips/stream.c /n/bootesdump/1990/11151/sys/src/9/mips/stream.c
766c
			panic("streamexit %d %s\n", s->opens, s->devq->info->name);
.
19c
Qinfo procinfo =
{
	stputq,
	nullput,
	0,
	0,
	"process"
};
.
## diffname port/stream.c 1990/11161
## diff -e /n/bootesdump/1990/11151/sys/src/9/mips/stream.c /n/bootesdump/1990/11161/sys/src/9/mips/stream.c
838c
	qunlock(s);
.
821c
			WR(q)->put = nullput;

			/*
			 *  this may be 2 streams joined device end to device end
			 */
			if(q == s->devq->other)
				break;
.
813,819c
				poperror();
.
806,810c
		/*
		 *  descend the stream closing the queues
		 */
		for(q = s->procq; q; q = q->next){
			if(!waserror()){
.
804c
	qlock(s);
.
787c
		qunlock(s);
.
770c
		qlock(s);
.
753c
	qunlock(s);
.
749c
		qunlock(s);
.
747c
	qlock(s);
.
730c
			qunlock(s);
.
727c
				qunlock(s);
.
720c
			qlock(s);
.
700c
	qunlock(s);
.
667c
		qunlock(s);
.
658c
				qunlock(s);
.
655c
			if(canqlock(s)){
.
## diffname port/stream.c 1990/11211
## diff -e /n/bootesdump/1990/11161/sys/src/9/mips/stream.c /n/bootesdump/1990/11211/sys/src/9/mips/stream.c
1106c
		error(Ehungup);
.
1098c
	if(STREAMTYPE(c->qid.path) != Sdataqid)
.
1026c
	if(STREAMTYPE(c->qid.path) != Sctlqid)
.
981c
					error(Ehungup);
.
956c
	if(STREAMTYPE(c->qid.path) != Sdataqid)
.
925c
		if(CHDIR & c->qid.path)
.
921c
	if(STREAMTYPE(c->qid.path) == Sctlqid){
.
847,848c
		return 1;
	return streamclose1(c->stream);
.
840c
int
.
838a
	return rv;
.
832c
	rv = --(s->opens);
.
799a
	int rv;
.
795c
int
.
737c
	c->stream = streamnew(c->type, c->dev, STREAMID(c->qid.path), qi, 0);
.
723c
		 	&& s->id == STREAMID(c->qid.path)){
.
719c
		   && s->id == STREAMID(c->qid.path)){
.
664c
		error(Enostream);
.
636c
	devdir(c, (Qid){STREAMQID(STREAMID(c->qid.path),tab->qid.path), 0}, tab->name, tab->length,
.
611,612c
	"data",		{Sdataqid},	0,			0600,
	"ctl",		{Sctlqid},	0,			0600,
.
561c
	error(Ebadld);
.
557c
		error(Ebadld);
.
544c
		error(Ehungup);
.
338c
		error(Ebadld);
.
242c
		error(Enoqueue);
.
## diffname port/stream.c 1990/1127
## diff -e /n/bootesdump/1990/11211/sys/src/9/mips/stream.c /n/bootesdump/1990/1127/sys/src/9/mips/stream.c
849c
		return;
.
140a
		if(newblock(bcp) == 0)
			continue;
.
113a
 *  upgrade a block 0 block to another class (called with bcp qlocked)
 */
newblock(Bclass *bcp)
{
	Page *page;
	int n;
	Block *bp;
	uchar *cp;

	if(bcp->made > bcp->lim)
		return;

	if(bcp == bclass){
		/*
		 *  create some level zero blocks and return
		 */
		page = newpage(1, 0, 0);
		page->va = VA(kmap(page));
		n = BY2PG/sizeof(Block);
		bp = (Block *)(page->va);
		while(n-- > 0){
			bp->flags = 0;
			bp->base = bp->lim = bp->rptr = bp->wptr = 0;
			if(bcp->first)
				bcp->last->next = bp;
			else
				bcp->first = bp;
			bcp->last = bp;
			bcp->made++;
			bp++;
		}
	} else {
		/*
		 *  create a page worth of new blocks
		 */
		page = newpage(1, 0, 0);
		page->va = VA(kmap(page));
		n = BY2PG/bcp->size;
		cp = (uchar *)(page->va);
		
		while(n-- > 0){
			/*
			 *  upgrade a level 0 block
			 */
			bp = allocb(0);
			qlock(bclass);
			bclass->made--;
			bcp->made++;
			bp->flags = bcp - bclass;
			qunlock(bclass);

			/*
			 *  tack on the data area
			 */
			bp->base = bp->rptr = bp->wptr = cp;
			cp += bcp->size;
			bp->lim = cp;
			if(bcp->first)
				bcp->last->next = bp;
			else
				bcp->first = bp;
			bcp->last = bp;
		}
	}
	return;
}

/*
.
84,91c
		bcp->lim = n;
		bcp->made = 0;
.
77,78c

	/*
	 *  set limits on blocks
	 */
.
73c
	 *  allocate queues, streams
.
69d
46a
	int	lim;
	int	made;
.
38d
## diffname port/stream.c 1990/1202
## diff -e /n/bootesdump/1990/1127/sys/src/9/mips/stream.c /n/bootesdump/1990/1202/sys/src/9/mips/stream.c
1117a
		freeb(bp);
	} else if(streamparse("look", bp)){
		qlook(s, (char *)bp->rptr);
.
1083a
 *	look ldname		-- look for a line discipline
.
1078a
 *  look for an instance of the line discipline `name' on
 *  the stream `s'
 */
void
qlook(Stream *s, char *name)
{
	Queue *q;

	for(q = s->procq; q; q = q->next){
		if(strcmp(q->info->name, name) == 0)
			return;

		/*
		 *  this may be 2 streams joined device end to device end
		 */
		if(q == s->devq->other)
			break;
	}
	errors("not found");
}

/*
.
## diffname port/stream.c 1990/1212
## diff -e /n/bootesdump/1990/1210/sys/src/9/mips/stream.c /n/bootesdump/1990/1212/sys/src/9/port/stream.c
1170c
	q->rp = &q->r;
	sleep(q->rp, notfull, q->next);
.
1051c
			q->rp = &q->r;
			sleep(q->rp, &isinput, (void *)q);
.
957c
		wakeup(q->rp);
.
931c
		wakeup(q->other->rp);
.
498c
			wakeup(q->other->next->other->rp);
.
325a
	wq->rp = &wq->r;
.
316a
	q->rp = &q->r;
.
## diffname port/stream.c 1990/1214
## diff -e /n/bootesdump/1990/1212/sys/src/9/port/stream.c /n/bootesdump/1990/1214/sys/src/9/port/stream.c
936d
917c
		return 0;
.
450d
424d
174c
	return 0;
.
120c
		return -1;
.
111a
int
.
## diffname port/stream.c 1990/1219
## diff -e /n/bootesdump/1990/1214/sys/src/9/port/stream.c /n/bootesdump/1990/1219/sys/src/9/port/stream.c
216a
		splhi();
.
## diffname port/stream.c 1990/1229
## diff -e /n/bootesdump/1990/1219/sys/src/9/port/stream.c /n/bootesdump/1990/1229/sys/src/9/port/stream.c
244a
	int x;
.
217d
## diffname port/stream.c 1991/0115
## diff -e /n/bootesdump/1990/1229/sys/src/9/port/stream.c /n/bootesdump/1991/0115/sys/src/9/port/stream.c
59,60d
## diffname port/stream.c 1991/0316
## diff -e /n/bootesdump/1991/0115/sys/src/9/port/stream.c /n/bootesdump/1991/0316/sys/src/9/port/stream.c
598a

.
444a
blen(Block *bp)
{
	int len;

	len = 0;
	while(bp) {
		len += BLEN(bp);
		bp = bp->next;
	}

	return len;
}

/*
 * bround - round a block to chain to some 2^n number of bytes
 */
int
bround(Block *bp, int amount)
{
	Block *last;
	int len, pad;

	len = 0;
	SET(last);
	while(bp) {
		len += BLEN(bp);
		last = bp;
		bp = bp->next;
	}

	pad = ((len + amount) & ~amount) - len;
	if(pad) {
		last->next = allocb(pad);
		memset(last->next->rptr, 0, pad);
		last->next->flags |= S_DELIM;
		last->flags &= ~S_DELIM;
	}

	return len + pad;
}

int
.
443a

.
## diffname port/stream.c 1991/0317
## diff -e /n/bootesdump/1991/0316/sys/src/9/port/stream.c /n/bootesdump/1991/0317/sys/src/9/port/stream.c
1380c
	print("%d queues\n", qcount);
.
1372a
		qcount++;
.
1369a
	qcount = 0;
.
1365c
	int count, qcount;
.
481a
		last = last->next;
		memset(last->wptr, 0, pad);
		last->wptr += pad;
		last->flags |= S_DELIM;
		
.
479,480d
469c
	SET(last);	/* Ken's magic */

.
460c
 * bround - round a block chain to some 2^n number of bytes
.
## diffname port/stream.c 1991/0318
## diff -e /n/bootesdump/1991/0317/sys/src/9/port/stream.c /n/bootesdump/1991/0318/sys/src/9/port/stream.c
1280c
				memmove(bp->wptr, a, i);
.
1273c
				memmove(bp->wptr, a, rem);
.
1173c
	memmove(bp->wptr, a, n);
.
1114c
			memmove(buf, bp->rptr, left);
.
1105c
			memmove(buf, bp->rptr, i);
.
1021c
	memmove(buf, str + c->offset, n);
.
610c
			memmove(bp->wptr, nbp->rptr, i);
.
605c
			memmove(bp->wptr, nbp->rptr, n);
.
## diffname port/stream.c 1991/0320
## diff -e /n/bootesdump/1991/0318/sys/src/9/port/stream.c /n/bootesdump/1991/0320/sys/src/9/port/stream.c
1381,1382c
		print("  W n %d l %d f %ux r %ux next %lux put %lux Rz %lux\n", 
			WR(q)->nb, WR(q)->len,
			WR(q)->flag, &(WR(q)->r), q->next, q->put, q->rp);
.
## diffname port/stream.c 1991/0323
## diff -e /n/bootesdump/1991/0320/sys/src/9/port/stream.c /n/bootesdump/1991/0323/sys/src/9/port/stream.c
1379,1380c
		print("%10s %ux  R n %d l %d f %ux r %ux", q->info->name, q,
			q->nb, q->len, q->flag, &(q->r));
.
## diffname port/stream.c 1991/0328
## diff -e /n/bootesdump/1991/0323/sys/src/9/port/stream.c /n/bootesdump/1991/0328/sys/src/9/port/stream.c
261,264d
246,259c
	for(; bp; bp = nbp){
		bcp = &bclass[bp->flags & S_CLASS];
		lock(bcp);
		bp->rptr = bp->wptr = 0;
		if(bcp->first)
			bcp->last->next = bp;
		else
			bcp->first = bp;
		bcp->last = bp;
		nbp = bp->next;
		bp->next = 0;
		unlock(bcp);
		if(bcp->r.p)
			wakeup(&bcp->r);
.
241d
239a
	Block *nbp;
.
229a
	if(bp->lim-bp->rptr<size && size<4096)
		panic("allocb %lux %lux %d %ux %d", bp->lim, bp->rptr,
			size, bp->flags, bcp-bclass);
.
## diffname port/stream.c 1991/0401
## diff -e /n/bootesdump/1991/0328/sys/src/9/port/stream.c /n/bootesdump/1991/0401/sys/src/9/port/stream.c
714a
		while(*bp->rptr==' ' && bp->wptr>bp->rptr)
			bp->rptr++;
.
## diffname port/stream.c 1991/0404
## diff -e /n/bootesdump/1991/0401/sys/src/9/port/stream.c /n/bootesdump/1991/0404/sys/src/9/port/stream.c
1252c
	if(!docopy && isphys(a)){
.
## diffname port/stream.c 1991/0411
## diff -e /n/bootesdump/1991/0404/sys/src/9/port/stream.c /n/bootesdump/1991/0411/sys/src/9/port/stream.c
1249,1250c
	if(q->other->flag & QHUNGUP){
		if(s->err)
			errors((char*)(s->err->rptr));
		else
			error(Ehungup);
	}
.
1094c
				if(s->err)
					errors((char*)s->err->rptr);
				else if(s->hread++<3)
.
1039c
		return stringread(c, buf, n, num, c->offset);
.
1022c
	memmove(buf, str + offset, n);
.
1017c
	i -= offset;
.
1012c
stringread(Chan *c, uchar *buf, long n, char *str, ulong offset)
.
977c
		s = q->ptr;
		if(bp->rptr<bp->wptr && s->err==0)
			s->err = bp;
		else
			freeb(bp);
.
974a
	Stream *s;
.
897a
		if(s->err)
			freeb(s->err);
.
803a
	WR(q)->ptr = s;
	RD(q)->ptr = s;
.
792a
	s->err = 0;
.
## diffname port/stream.c 1991/0413
## diff -e /n/bootesdump/1991/0411/sys/src/9/port/stream.c /n/bootesdump/1991/0413/sys/src/9/port/stream.c
959a
	if(*err)
		errors(err);
.
931c
			if(waserror()){
				if(*err == 0)
					strncpy(err, u->error, ERRLEN-1);
			} else {
.
925a
	*err = 0;
.
920a
	char err[ERRLEN];
.
410a
	qunlock(s);
.
403a
	if(waserror()){
		qunlock(s);
		nexterror();
	}
	qlock(s);
.
388a
	qunlock(s);
.
384a
	qlock(s);
.
## diffname port/stream.c 1991/0419
## diff -e /n/bootesdump/1991/0413/sys/src/9/port/stream.c /n/bootesdump/1991/0419/sys/src/9/port/stream.c
598a
	 */
.
250a
		bp->flags = S_CLASS;			/* Check for doulbe free */
.
225a
	bp->flags = bcp - bclass;
.
## diffname port/stream.c 1991/0420
## diff -e /n/bootesdump/1991/0419/sys/src/9/port/stream.c /n/bootesdump/1991/0420/sys/src/9/port/stream.c
252c
		bp->flags = bp->flags|S_CLASS;		/* Check for doulbe free */
.
249a

.
243a
	ulong mark[1];
.
## diffname port/stream.c 1991/0421
## diff -e /n/bootesdump/1991/0420/sys/src/9/port/stream.c /n/bootesdump/1991/0421/sys/src/9/port/stream.c
55c
	{ 268 },
.
## diffname port/stream.c 1991/0501
## diff -e /n/bootesdump/1991/0421/sys/src/9/port/stream.c /n/bootesdump/1991/0501/sys/src/9/port/stream.c
1152c
	}
.
## diffname port/stream.c 1991/0502
## diff -e /n/bootesdump/1991/0501/sys/src/9/port/stream.c /n/bootesdump/1991/0502/sys/src/9/port/stream.c
1324a
	poperror();
.
1288c
	/*
	 *  if an error occurs during write n,
	 *  force a delim before write n+1
	 */
	if(waserror()){
		s->forcedelim = 1;
		nexterror();
	}
	if(s->forcedelim){
		FLOWCTL(q);
		bp = allocb(0);
		bp->flags |= S_DELIM;
		bp->type = M_DATA;
		PUTNEXT(q, bp);
		s->forcedelim = 0;
	}

	if(0 && !docopy && isphys(a)){
.
1270a
	if(n == 1 && *((char*)a) == 1)
		print("u->p->pid %d %s\n", u->p->pid, u->p->text);

.
1012a
		if(BLEN(bp) == 1 && *(bp->rptr) == 1)
			print("stputq u->p->pid %d %s\n", u->p->pid, u->p->text);

.
## diffname port/stream.c 1991/0504
## diff -e /n/bootesdump/1991/0502/sys/src/9/port/stream.c /n/bootesdump/1991/0504/sys/src/9/port/stream.c
810a
	s->forcedelim = 0;
.
## diffname port/stream.c 1991/0507
## diff -e /n/bootesdump/1991/0504/sys/src/9/port/stream.c /n/bootesdump/1991/0507/sys/src/9/port/stream.c
1014,1016d
## diffname port/stream.c 1991/0511
## diff -e /n/bootesdump/1991/0507/sys/src/9/port/stream.c /n/bootesdump/1991/0511/sys/src/9/port/stream.c
1272,1274d
## diffname port/stream.c 1991/0516
## diff -e /n/bootesdump/1991/0511/sys/src/9/port/stream.c /n/bootesdump/1991/0516/sys/src/9/port/stream.c
254c
		bp->flags = bp->flags|S_CLASS;		/* Check for double free */
.
## diffname port/stream.c 1991/0614
## diff -e /n/bootesdump/1991/0516/sys/src/9/port/stream.c /n/bootesdump/1991/0614/sys/src/9/port/stream.c
421a
	poperror();
.
## diffname port/stream.c 1991/0705
## diff -e /n/bootesdump/1991/0614/sys/src/9/port/stream.c /n/bootesdump/1991/0705/sys/src/9/port/stream.c
422d
144,145d
125,126d
120a
	page = newpage(1, 0, 0);
	page->va = VA(kmap(page));
.
## diffname port/stream.c 1991/0809
## diff -e /n/bootesdump/1991/0705/sys/src/9/port/stream.c /n/bootesdump/1991/0809/sys/src/9/port/stream.c
1131a
			continue;
		}

		if(s->flushmsg){
			if(bp->flags & S_DELIM)
				s->flushmsg = 0;
			freeb(bp);
.
1118d
1109a
		/*
		 *  notes will flush the rest of any partially
		 *  read message.
		 */
		if(n != left)
			s->flushmsg = 1;
.
1107a
	left = n;
.
825a
	s->flushmsg = 0;
.
## diffname port/stream.c 1991/0811
## diff -e /n/bootesdump/1991/0809/sys/src/9/port/stream.c /n/bootesdump/1991/0811/sys/src/9/port/stream.c
1165a

	/*
	 *  free completely read blocks
	 */
	if(tofree)
		freeb(tofree);
.
1157,1158d
1154,1155c
			bp->next = tofree;
			tofree = bp;
			if(bp->flags & S_DELIM)
.
1142,1148d
1116,1117c
		while(tofree){
			bp = tofree;
			tofree = bp->next;
			bp->next = 0;
			putbq(q, bp);
		}
.
1113,1114c
		 *  put any partially read message back into the
		 *  queue
.
1110a
	tofree = 0;
.
1096a
	Block *tofree;
.
## diffname port/stream.c 1991/0831
## diff -e /n/bootesdump/1991/0811/sys/src/9/port/stream.c /n/bootesdump/1991/0831/sys/src/9/port/stream.c
1465c
		print("%d byte blocks: %d made %d free\n", bcp->size,
			bcp->made, count);
.
213d
210,211c
		newblock(bcp);
.
207c
			unlock(bcp);
.
202,205d
156c
			unlock(bclass);
.
152c
			lock(bclass);
.
118,120d
84d
77c
	 *  set block classes
.
49,50d
46d
## diffname port/stream.c 1991/0901
## diff -e /n/bootesdump/1991/0831/sys/src/9/port/stream.c /n/bootesdump/1991/0901/sys/src/9/port/stream.c
250,251d
## diffname port/stream.c 1991/0904
## diff -e /n/bootesdump/1991/0901/sys/src/9/port/stream.c /n/bootesdump/1991/0904/sys/src/9/port/stream.c
1097a
	q = 0;
.
## diffname port/stream.c 1991/0926
## diff -e /n/bootesdump/1991/0904/sys/src/9/port/stream.c /n/bootesdump/1991/0926/sys/src/9/port/stream.c
1308,1345c
	/*
	 *  send it down stream
	 */
	last->flags |= S_DELIM;
	FLOWCTL(q);
	PUTNEXT(q, first);
.
1304,1305c
		a = ((char*)a) + i;
		if(first == 0)
			first = bp;
		else
			last->next = bp;
		last = bp;
		if(i == rem)
			break;
.
1295,1302c
	first = last = 0;
	for(rem = n; ; rem -= i) {
		bp = allocb(rem);
		i = bp->lim - bp->wptr;
		if(i >= rem)
			i = rem;
		memmove(bp->wptr, a, i);
		bp->wptr += i;
.
1292,1293c
	 *  copy the whole write into kernel space
.
1270a
	Block *bp, *first, *last;
.
1267d
811d
794d
## diffname port/stream.c 1991/1012
## diff -e /n/bootesdump/1991/0926/sys/src/9/port/stream.c /n/bootesdump/1991/1012/sys/src/9/port/stream.c
96a
	if(qi->next)
		panic("newqinfo: already configured");

.
## diffname port/stream.c 1991/1027
## diff -e /n/bootesdump/1991/1012/sys/src/9/port/stream.c /n/bootesdump/1991/1027/sys/src/9/port/stream.c
1327,1331c
getfields(char *lp, char **fields, int n, char sep)
.
## diffname port/stream.c 1991/1105
## diff -e /n/bootesdump/1991/1027/sys/src/9/port/stream.c /n/bootesdump/1991/1105/sys/src/9/port/stream.c
1193c
 *  This routing is entered with s->wrlock'ed and must unlock.
.
## diffname port/stream.c 1991/1107
## diff -e /n/bootesdump/1991/1105/sys/src/9/port/stream.c /n/bootesdump/1991/1107/sys/src/9/port/stream.c
1420c
		qunlock(bcp);
.
1417c
		qlock(bcp);
.
619a
 *  expand a block list to be one byte, len bytes long
 */
Block*
expandb(Block *bp, int len)
{
	Block *nbp, *new;
	int i;

	new = allocb(len);
	if(new == 0){
		freeb(bp);
		return 0;
	}

	/*
	 *  copy bytes into new block
	 */
	for(nbp = bp; len>0 && nbp; nbp = nbp->next){
		i = BLEN(bp);
		if(i > len) {
			memmove(new->wptr, nbp->rptr, len);
			new->wptr += len;
			break;
		} else {
			memmove(new->wptr, nbp->rptr, i);
			new->wptr += i;
			len -= i;
		}
	}
	if(len){
		memset(new->wptr, 0, len);
		new->wptr += len;
	}
	freeb(bp);
	return new;

}

/*
.
252c
		qunlock(bcp);
.
243c
		qlock(bcp);
.
209c
	qunlock(bcp);
.
199c
			qunlock(bcp);
.
196c
	qlock(bcp);
.
152c
			qunlock(bclass);
.
148c
			qlock(bclass);
.
46a
	QLock;
.
## diffname port/stream.c 1991/1109
## diff -e /n/bootesdump/1991/1107/sys/src/9/port/stream.c /n/bootesdump/1991/1109/sys/src/9/port/stream.c
1410c
	devdir(c, c->qid, name, n, eve, 0, &dir);
.
790,791c
	devdir(c, (Qid){STREAMQID(STREAMID(c->qid.path),tab->qid.path), 0}, 
		tab->name, tab->length, eve, tab->perm, dp);
.
## diffname port/stream.c 1991/1115
## diff -e /n/bootesdump/1991/1109/sys/src/9/port/stream.c /n/bootesdump/1991/1115/sys/src/9/port/stream.c
1095c
		return stringread(buf, n, num, c->offset);
.
1068c
stringread(uchar *buf, long n, char *str, ulong offset)
.
691a
	USED(q);
.
## diffname port/stream.c 1991/1121
## diff -e /n/bootesdump/1991/1115/sys/src/9/port/stream.c /n/bootesdump/1991/1121/sys/src/9/port/stream.c
653a
	new->flags |= delim;
.
639c
		delim = nbp->flags & S_DELIM;
		i = BLEN(nbp);
.
627a
	ulong delim = 0;
.
## diffname port/stream.c 1991/1122
## diff -e /n/bootesdump/1991/1121/sys/src/9/port/stream.c /n/bootesdump/1991/1122/sys/src/9/port/stream.c
217a
	bp->list = 0;
.
## diffname port/stream.c 1991/1126
## diff -e /n/bootesdump/1991/1122/sys/src/9/port/stream.c /n/bootesdump/1991/1126/sys/src/9/port/stream.c
543c
		if((q->flag&QHIWAT) && q->len<Streamhi/2 && q->nb<Streambhi/2 &&q->other){
.
## diffname port/stream.c 1991/1227
## diff -e /n/bootesdump/1991/1126/sys/src/9/port/stream.c /n/bootesdump/1991/1227/sys/src/9/port/stream.c
1009,1010d
977,980c
			if(!waserror()){
.
971d
965d
## diffname port/stream.c 1992/0101
## diff -e /n/bootesdump/1991/1227/sys/src/9/port/stream.c /n/bootesdump/1992/0101/sys/src/9/port/stream.c
119c
	page->va = VA(kmapperm(page));
.
## diffname port/stream.c 1992/0111
## diff -e /n/bootesdump/1992/0101/sys/src/9/port/stream.c /n/bootesdump/1992/0111/sys/src/9/port/stream.c
7c
#include	"../port/error.h"
.
## diffname port/stream.c 1992/0114
## diff -e /n/bootesdump/1992/0111/sys/src/9/port/stream.c /n/bootesdump/1992/0114/sys/src/9/port/stream.c
1325c
			error((char*)(s->err->rptr));
.
1220c
	error(Ebadarg);
.
1162c
					error((char*)s->err->rptr);
.
823c
		exhausted("streams");
.
299c
		exhausted("queues");
.
## diffname port/stream.c 1992/0207
## diff -e /n/bootesdump/1992/0114/sys/src/9/port/stream.c /n/bootesdump/1992/0207/sys/src/9/port/stream.c
1169c
			sleep(q->rp, isinput, (void *)q);
.
## diffname port/stream.c 1992/0222
## diff -e /n/bootesdump/1992/0207/sys/src/9/port/stream.c /n/bootesdump/1992/0222/sys/src/9/port/stream.c
239,240c
	pc = getcallerpc(((uchar*)&bp) - sizeof(bp));
	if((bp->flags&S_CLASS) >= Nclass)		/* Check for double free */
		panic("freeb class last(%lux) this(%lux)", bp->pc, pc);
	bp->pc = pc;
.
237a
	ulong pc;
.
## diffname port/stream.c 1992/0305
## diff -e /n/bootesdump/1992/0222/sys/src/9/port/stream.c /n/bootesdump/1992/0305/sys/src/9/port/stream.c
1359,1360c
	FLOWCTL(q, first);
.
1296,1299c
	PUTNEXT(q, bp);
.
1294c
		poperror();
.
1291,1292c
	if(bp->type != M_HANGUP){
		qlock(&q->rlock);
		if(waserror()){
			qunlock(&q->rlock);
			freeb(bp);
			nexterror();
		}
		q->rp = &q->r;
		sleep(q->rp, notfull, q->next);
.
1289c
flowctl(Queue *q, Block *bp)
.
## diffname port/stream.c 1992/0318
## diff -e /n/bootesdump/1992/0305/sys/src/9/port/stream.c /n/bootesdump/1992/0318/sys/src/9/port/stream.c
1415a
}

Block *
copyb(Block *bp, int count)
{
	Block *nb, *head, **p;
	int l;

	p = &head;
	while(count) {
		l = BLEN(bp);
		if(count < l)
			l = count;
		nb = allocb(l);
		if(nb == 0)
			panic("copyb.1");
		memmove(nb->wptr, bp->rptr, l);
		nb->wptr += l;
		count -= l;
		if(bp->flags & S_DELIM)
			nb->flags |= S_DELIM;
		*p = nb;
		p = &nb->next;
		bp = bp->next;
		if(bp == 0)
			break;
	}
	if(count) {
		nb = allocb(count);
		if(nb == 0)
			panic("copyb.2");
		memset(nb->wptr, 0, count);
		nb->wptr += count;
		nb->flags |= S_DELIM;
		*p = nb;
	}
	if(blen(head) == 0)
		print("copyb: zero length\n");

	return head;
.
1262a
		if(qi == 0)
			error(Ebadld);
.
721c
	return 0;
.
717c
		return 0;
.
711c
Qinfo *
.
## diffname port/stream.c 1992/0319
## diff -e /n/bootesdump/1992/0318/sys/src/9/port/stream.c /n/bootesdump/1992/0319/sys/src/9/port/stream.c
1510a
	for(i = 0; i < 100; i++) {
		if(refsa[i] != refsf[i])
			print("%d alloc %lux %d free %lux\n", refsa[i], apcs[i], refsf[i], fpcs[i]);
	}
.
1497a
			if(q->pg)
				print("get %d %s ", q->pg->pid, q->pg->text);
			if(q->pp)
				print("put %d %s ", q->pp->pid, q->pp->text);
		print("\n");
.
1495c
			if(q->pg)
				print("get %d %s ", q->pg->pid, q->pg->text);
			if(q->pp)
				print("put %d %s ", q->pp->pid, q->pp->text);
		print("  W n %d l %d f %ux r %ux next %lux put %lux Rz %lux", 
.
1493c
		print("%10s %ux  R n %d l %d f %ux r %ux ",
			q->info->name, q,
.
1486c
int i;
.
1471,1473c
		print("%c %c%d%c", c, bp->type == M_DATA ? 'd' : 'c',
			bp->wptr-bp->rptr, (bp->flags&S_DELIM)?'D':' ');
		for(cp = bp->rptr; cp<bp->wptr && cp<bp->rptr+30; cp++)
			print(" %.2x", *cp);
.
1040a
		q->pp = u->p;
.
538a
	q->pg = u->p;
.
426a
	q->pp = u->p;
.
242a

	bcp = &bclass[bp->flags & S_CLASS];
	if(bcp->size == 68) {
		for(i = 0; i < 100; i++)
			if(apcs[i] == bp->pc) {
				refsf[i]++;
				fpcs[i] = pc;
			}
	}

.
238a
int i;
.
223a
	bp->pc = pc;
.
209a

	if(bcp->size == 68) {
		int i, hole, fnd;
		fnd = 0;
		hole = -1;
		for(i = 0; i < 100; i++) {
			if(apcs[i] == pc) {
				refsa[i]++;
				fnd = 1;
				break;
			}
			if(refsa[i] == 0 && hole<0)
				hole = i;
		}
		if(fnd == 0 && hole>=0) {
			refsa[hole] = 1;
			apcs[hole] = pc;
		}
	}
.
187a
	pc = getcallerpc(((uchar*)&size) - sizeof(size));

.
186a
	ulong pc;
.
10a
ulong fpcs[100], refsa[100], refsf[100], apcs[100];

.
## diffname port/stream.c 1992/0321
## diff -e /n/bootesdump/1992/0319/sys/src/9/port/stream.c /n/bootesdump/1992/0321/sys/src/9/port/stream.c
2c
#include	"../port/lib.h"
.
## diffname port/stream.c 1992/0326
## diff -e /n/bootesdump/1992/0321/sys/src/9/port/stream.c /n/bootesdump/1992/0326/sys/src/9/port/stream.c
1561,1564d
1543,1546d
1536,1539d
1526c

.
1079d
576d
463d
269,278d
264d
248d
215,233d
191,192d
189d
11,12d
## diffname port/stream.c 1992/0509
## diff -e /n/bootesdump/1992/0326/sys/src/9/port/stream.c /n/bootesdump/1992/0509/sys/src/9/port/stream.c
1509d
1506d
## diffname port/stream.c 1992/0520
## diff -e /n/bootesdump/1992/0509/sys/src/9/port/stream.c /n/bootesdump/1992/0520/sys/src/9/port/stream.c
1102a
	return 0;	/* not reached */
.
## diffname port/stream.c 1992/0529
## diff -e /n/bootesdump/1992/0520/sys/src/9/port/stream.c /n/bootesdump/1992/0529/sys/src/9/port/stream.c
625c
 *  expand a block list to be one block, len bytes long
.
607c
		if(i >= n) {
.
## diffname port/stream.c 1992/0603
## diff -e /n/bootesdump/1992/0529/sys/src/9/port/stream.c /n/bootesdump/1992/0603/sys/src/9/port/stream.c
244c
#endif asdf
.
239a
#ifdef asdf
.
220,223d
174,181d
## diffname port/stream.c 1992/0609
## diff -e /n/bootesdump/1992/0603/sys/src/9/port/stream.c /n/bootesdump/1992/0609/sys/src/9/port/stream.c
545a
 *  grab all the blocks in a queue
 */
Block *
grabq(Queue *q)
{
	Block *bp;

	lock(q);
	bp = q->first;
	if(bp){
		q->first = 0;
		q->last = 0;
		q->len = 0;
		q->nb = 0;
		if(q->flag&QHIWAT){
			wakeup(q->other->next->other->rp);
			q->flag &= ~QHIWAT;
		}
	}
	unlock(q);
	return bp;
}

/*
.
## diffname port/stream.c 1992/0619
## diff -e /n/bootesdump/1992/0609/sys/src/9/port/stream.c /n/bootesdump/1992/0619/sys/src/9/port/stream.c
1519,1525d
1500d
228,246c
	while(bp){
		bp->rptr = 0;
		bp->wptr = 0;
		next = bp->next;
		free(bp);
		bp = next;	
.
222,226c
	Block *next;
.
186,208c
	data = (uchar*)bp + sizeof(Block);
	bp->rptr = data;
	bp->wptr = data;
	bp->base = data;
	bp->lim = data+size;
	bp->flags = 0;
.
180,184c
	bp = smalloc(sizeof(Block)+size);
.
178c
	uchar *data;
.
108,171d
75,85d
71,72c
	slist = (Stream *)xalloc(conf.nstream * sizeof(Stream));
	qlist = (Queue *)xalloc(conf.nqueue * sizeof(Queue));
.
65,66d
41,57d
33,35d
9d
## diffname port/stream.c 1992/0623
## diff -e /n/bootesdump/1992/0619/sys/src/9/port/stream.c /n/bootesdump/1992/0623/sys/src/9/port/stream.c
1361,1381d
1355c
	return i;
.
1347,1353c
	for(i=0; lp && *lp && i<n; i++){
		while(*lp == sep)
			*lp++=0;
		if(*lp == 0)
			break;
		fields[i]=lp;
		while(*lp && *lp != sep)
			lp++;
.
1344,1345c
	int i;
.
1341,1342c
int
getfields(char *lp, char **fields, int n, char sep)
.
1339c
 *  like andrew's getmfields but no hidden state
.
1335c
/*
 *  parse a string and return a pointer to the second element if the 
 *  first matches name.  bp->rptr will be updated to point to the
 *  second element.
 *
 *  return 0 if no match.
 *
 *  it is assumed that the block data is null terminated.  streamwrite
 *  guarantees this.
 */
int
streamparse(char *name, Block *bp)
{
	int len;

	len = strlen(name);
	if(BLEN(bp) < len)
		return 0;
	if(strncmp(name, (char *)bp->rptr, len)==0){
		if(bp->rptr[len] == ' ')
			bp->rptr += len+1;
		else if(bp->rptr[len])
			return 0;
		else
			bp->rptr += len;
		while(*bp->rptr==' ' && bp->wptr>bp->rptr)
			bp->rptr++;
		return 1;
	}
	return 0;
.
1304,1333c
	bp = allocb(0);
	bp->type = M_HANGUP;
	(*s->devq->put)(s->devq, bp);
}
.
1301,1302c
	Block *bp;
.
1298,1299c
/*
 *  send a hangup up a stream
 */
static void
hangup(Stream *s)
.
1248,1267d
973c
		return readstr(c->offset, buf, n, num);
.
966c
	char *buf = vbuf;
.
943,960d
883d
877c
	rv = s->opens;
	qunlock(s);
.
850c
	if(s->opens-- == 1){
.
847c
	 *  decrement the open count
.
836,837c
 *  Decrement the open count.  When it goes to zero, call the close
 *  routines for each queue in the stream.
.
828,832c
	qunlock(hb);
.
826a

		/*
		 *  unchain it from the hash bucket and free
		 */
		l = &hb->s;
		for(ns = hb->s; ns; ns = ns->next){
			if(s == ns){
				*l = s->next;
				break;
			}
			l = &ns->next;
		}
		free(s);
.
824d
811,813c
	hb = hash(s->type, s->dev, s->id);
	qlock(hb);
	if(s->inuse-- == 1){
.
809a
	Sthash *hb;
	Stream **l, *ns;
.
808d
803c
void
.
789,796c
	Sthash *hb;
	Stream *ns;

	hb = hash(s->type, s->dev, s->id);
	qlock(hb);
	for(ns = hb->s; ns; ns = ns->next)
		if(s->type == ns->type && s->dev == ns->dev && s->id == ns->id){
			s->inuse++;
			qunlock(hb);
			if(s->opens == 0){
				streamexit(s, 1);
				return -1;
			}
			return 0;
		}
	qunlock(hb);
	return -1;
.
783,784c
 *  Enter a stream only if the stream exists and is open.  Increment the
 *  reference count so it can't disappear under foot.
 *
 *  Return -1 if the stream no longer exists or is not opened.
.
753,778d
748c
 *  Associate a stream with a channel
.
727d
722d
719a
	 *  The ordering of these 2 instructions is very important.
	 *  It makes sure we finish the stream initialization before
	 *  anyone else can access it.
	 */
	qlock(s);
	qunlock(hb);

	if(waserror()){
		qunlock(s);
		streamclose1(s);
		nexterror();
	}

	/*
.
717a
	s->hread = 0;
	s->next = hb->s;
	hb->s = s;
.
713a
	s = smalloc(sizeof(Stream));
	s->inuse = 1;
.
712c
	 *  create and init a new stream
.
701,709d
698a
			return s;
.
692,696c
	qlock(hb);
	for(s = hb->s; s; s = s->next) {
		if(s->type == type && s->dev == dev && s->id == id){
			s->inuse++;
			qunlock(hb);
			if(noopen == 0){
				qlock(s);
				s->opens++;
.
690c
	 *  if the stream already exists, just increment the reference counts.
.
688a
	hb = hash(type, dev, id);

.
687a
	Sthash *hb;
.
680a
 *  return a hash bucket for a stream
 */
static Sthash*
hash(int type, int dev, int id)
{
	return &ht[(type*7*7 + dev*7 + id) & Nmask];
}

/*
.
647,654d
629,643c
static void	hangup(Stream*);

void
streaminit(void)
{
	/*
	 *  make stream modules available
	 */
	streaminit0();
.
627c
	Nbits=	5,
	Nhash=	1<<Nbits,
	Nmask=	Nhash-1,
};
typedef struct Sthash Sthash;
struct Sthash
{
	QLock;
	Stream	*s;
};
static Sthash ht[Nhash];
.
624,625c
enum
.
615,622c
 *  hash buckets containing all streams
.
609,613d
604,607c
Dirtab streamdir[]={
	"data",		{Sdataqid},	0,			0600,
	"ctl",		{Sctlqid},	0,			0600,
};
.
602c
 *  the per stream directory structure
.
593,600d
588,591d
586c
 *  Part 3) Streams
.
502,570d
461,500d
452,459d
301,346d
211d
201c
flushq(Queue *q)
.
198c
 *  flush a queue
.
194a
	free(RD(q));
.
185c
freeq(Queue *q)
.
182c
 *  free a queue
.
176,177d
153,157d
143,151c
	q = smalloc(2*sizeof(Queue));
.
134a
 *  make sure the first block has n bytes
 */
Block *
pullup(Block *bp, int n)
{
	Block *nbp;
	int i;

	/*
	 *  this should almost always be true, the rest it
	 *  just for to avoid every caller checking.
	 */
	if(BLEN(bp) >= n)
		return bp;

	/*
	 *  if not enough room in the first block,
	 *  add another to the front of the list.
	 */
	if(bp->lim - bp->rptr < n){
		nbp = allocb(n);
		nbp->next = bp;
		bp = nbp;
	}

	/*
	 *  copy bytes from the trailing blocks into the first
	 */
	n -= BLEN(bp);
	while(nbp = bp->next){
		i = BLEN(nbp);
		if(i >= n) {
			memmove(bp->wptr, nbp->rptr, n);
			bp->wptr += n;
			nbp->rptr += n;
			return bp;
		} else {
			memmove(bp->wptr, nbp->rptr, i);
			bp->wptr += i;
			bp->next = nbp->next;
			nbp->next = 0;
			freeb(nbp);
			n -= i;
		}
	}
	freeb(bp);
	return 0;
}

/*
 *  return the number of data bytes of a list of blocks
 */
int
blen(Block *bp)
{
	int len;

	len = 0;
	while(bp) {
		len += BLEN(bp);
		bp = bp->next;
	}

	return len;
}

/*
 *  round a block chain to some even number of bytes.  Used
 *  by devip.c becuase all IP packets must have an even number
 *  of bytes.
 *
 *  The last block in the returned chain will have S_DELIM set.
 */
int
bround(Block *bp, int amount)
{
	Block *last;
	int len, pad;

	len = 0;
	SET(last);	/* Ken's magic */

	while(bp) {
		len += BLEN(bp);
		last = bp;
		bp = bp->next;
	}

	pad = ((len + amount) & ~amount) - len;
	if(pad) {
		if(last->lim - last->wptr >= pad){
			memset(last->wptr, 0, pad);
			last->wptr += pad;
		} else {
			last->next = allocb(pad);
			last->flags &= ~S_DELIM;
			last = last->next;
			last->wptr += pad;
			last->flags |= S_DELIM;
		}
	}

	return len + pad;
}

/*
 *  expand a block list to be one block, len bytes long.  used by
 *  ethernet routines.
 */
Block*
expandb(Block *bp, int len)
{
	Block *nbp, *new;
	int i;
	ulong delim = 0;

	new = allocb(len);
	if(new == 0){
		freeb(bp);
		return 0;
	}

	/*
	 *  copy bytes into new block
	 */
	for(nbp = bp; len>0 && nbp; nbp = nbp->next){
		delim = nbp->flags & S_DELIM;
		i = BLEN(nbp);
		if(i > len) {
			memmove(new->wptr, nbp->rptr, len);
			new->wptr += len;
			break;
		} else {
			memmove(new->wptr, nbp->rptr, i);
			new->wptr += i;
			len -= i;
		}
	}
	if(len){
		memset(new->wptr, 0, len);
		new->wptr += len;
	}
	new->flags |= delim;
	freeb(bp);
	return new;

}

/*
 *  make a copy of the first 'count' bytes of a block chain.  Use
 *  by transport protocols.
 */
Block *
copyb(Block *bp, int count)
{
	Block *nb, *head, **p;
	int l;

	p = &head;
	while(count) {
		l = BLEN(bp);
		if(count < l)
			l = count;
		nb = allocb(l);
		if(nb == 0)
			panic("copyb.1");
		memmove(nb->wptr, bp->rptr, l);
		nb->wptr += l;
		count -= l;
		if(bp->flags & S_DELIM)
			nb->flags |= S_DELIM;
		*p = nb;
		p = &nb->next;
		bp = bp->next;
		if(bp == 0)
			break;
	}
	if(count) {
		nb = allocb(count);
		if(nb == 0)
			panic("copyb.2");
		memset(nb->wptr, 0, count);
		nb->wptr += count;
		nb->flags |= S_DELIM;
		*p = nb;
	}
	if(blen(head) == 0)
		print("copyb: zero length\n");

	return head;
}

/*
 *  Part 2) Queues
 */

/*
 *  process end line discipline
 */
static void stputq(Queue*, Block*);
Qinfo procinfo =
{
	stputq,
	nullput,
	0,
	0,
	"process"
};

/*
 *  line disciplines that can be pushed
 */
static Qinfo *lds;

/*
 *  make known a stream module and call its initialization routine, if
 *  it has one.
 */
void
newqinfo(Qinfo *qi)
{
	if(qi->next)
		panic("newqinfo: already configured");

	qi->next = lds;
	lds = qi;
	if(qi->reset)
		(*qi->reset)();
}

/*
 *  find the info structure for line discipline 'name'
 */
Qinfo *
qinfofind(char *name)
{
	Qinfo *qi;

	if(name == 0)
		return 0;
	for(qi = lds; qi; qi = qi->next)
		if(strcmp(qi->name, name)==0)
			return qi;
	return 0;
}

/*
.
115c
 *  Pad a block to the front with n bytes.  This is used to add protocol
 *  headers to the front of blocks.
.
98c
 *  someone trying to access it after freeing will cause a panic.
.
84,88c
	base = (uchar*)bp + sizeof(Block);
	lim = (uchar*)bp + msize(bp);
	bp->wptr = bp->rptr = lim - size;
	bp->base = base;
	bp->lim = lim;
.
80c
	uchar *base, *lim;
.
30,75d
28c
 *  Allocate a block.  Put the data portion at the end of the smalloc'd
 *  chunk so that it can easily grow from the front to add protocol
 *  headers.  Thank Larry Peterson for the suggestion.
.
17,25d
15c
 *  Part 1) Blocks
.
10,13d
## diffname port/stream.c 1992/0625
## diff -e /n/bootesdump/1992/0623/sys/src/9/port/stream.c /n/bootesdump/1992/0625/sys/src/9/port/stream.c
1325,1329d
1222,1227d
1220a
		}
.
1212,1219c
		va += i;
		rem -= i;
		if(rem > 0){
			FLOWCTL(q, bp);
		} else {
			bp->flags |= S_DELIM;
			FLOWCTL(q, bp);
.
1210c
		bp = allocb(i);
		memmove(bp->wptr, va, i);
.
1204,1208c
	va = a;
	rem = n;
	for(;;){
		if(rem > Streamhi)
			i = Streamhi;
		else
.
1202c
	 *  Write the message using blocks <= Streamhi bytes longs
.
1180c
	Block *bp;
	char *va;
.
940c
	if(awaken)
.
936c
			awaken = 1;
.
931c
			awaken |= bp->flags & S_DELIM;
.
926c
		awaken = bp->flags & S_DELIM;
.
917c
		awaken = 1;
.
905c
	int awaken;
.
837a
 *  nail down a stream so that it can't be closed
 */
void
naildownstream(Stream *s)
{
	s->opens++;
	s->inuse++;
}

/*
.
629,637d
278c
static Streamput stputq;
.
## diffname port/stream.c 1992/0711
## diff -e /n/bootesdump/1992/0625/sys/src/9/port/stream.c /n/bootesdump/1992/0711/sys/src/9/port/stream.c
1182a

	/*
	 *  docopy will get used if I ever figure out when to avoid copying
	 *  data. -- presotto
	 */
	USED(docopy);
.
886c
	streamexit(s);
.
846d
792d
788c
streamexit(Stream *s)
.
774c
				streamexit(s);
.
640,642d
## diffname port/stream.c 1992/0826
## diff -e /n/bootesdump/1992/0711/sys/src/9/port/stream.c /n/bootesdump/1992/0826/sys/src/9/port/stream.c
1257c
	devdir(c, c->qid, name, n, eve, perm, &dir);
.
1235c
streamstat(Chan *c, char *db, char *name, long perm)
.
## diffname port/stream.c 1993/0511
## diff -e /n/bootesdump/1992/0826/sys/src/9/port/stream.c /n/fornaxdump/1993/0511/sys/src/brazil/port/stream.c
955,1323c
		memmove(p, b->rp, len);
.
936,953c
	n = BLEN(b);
	if(n < len){
		memmove(p, b->rp, n);
.
934a
		return -1;
.
844,933c
	lock(q);
	b = q->first;
	if(b == 0){
		q->state |= Qcsleep;
.
841,842c
	Block *b;
	int n;
.
839c
consume(Queue *q, uchar *p, int len, int drop)
.
827,837d
825c
 *  copy out of a queue, returns # bytes copied
.
780,821c
	return b;
.
764,778c
	b->base = (uchar*)(b+1);
	b->rp = b->wp = b->base;
	b->lim = b->base + size;
.
752,762c
	b = alloc(sizeof(Block) + size);
	if(b == 0)
		exhausted("blocks");
.
749,750c
	Block *b;
.
743,747c
Block*
allocb(int size)
.
671,740c
	q = smalloc(sizeof(Queue));
	q->limit = limit;
.
669d
615,667d
601,613c
Queue*
allocq(int limit)
.
599c
 *  allocate queues and blocks
.
589,595c
	/* start garbage collector */
	kproc("buffer", bgc, 0);
.
587c
blockinit(void)
.
582,585d
569,579c
	lock(&freed);
	b->next = freed->first;
	freed->first = b;
	unlock(&freed);
.
563,567c
void
freeb(Block *b)
.
559,560d
382,556c
		for(; b; b = nb){
			nb = b->next;
			free(b);
.
374,380c
		x = slphi();
		lock(&freed);
		b = freed->first;
		freed->first = freed->last = 0;;
		unlock(&freed);
		spllo();
.
365,372c
	USED(arg);
	for(;;){
		tsleep(&freed->r, return0, 0, 500);
		if(freed->first == 0)
			continue;
.
363c
	Block *b, *nb;
.
361c
bgc(void *arg)
.
19,359d
15,17c
 *  Interrupt handlers use freeb() to release blocks.  They are
 *  garbage collected by the kproc running bgc().
.
10,12c
static Queue *freed;
.
8d
6d
1d
## diffname port/stream.c 1993/0512
## diff -e /n/fornaxdump/1993/0511/sys/src/brazil/port/stream.c /n/fornaxdump/1993/0512/sys/src/brazil/port/stream.c
99,102c
	if(n < len)
		len = n;
	memmove(p, b->rp, len);
	if(len == n || drop){
		q->first = b->next;
		ifree(b);
	} else
		b->rp += len;
	unlock(q);
	return len;
}

int
produce(Queue *q, uchar *p, int len)
{
	Block *b;

	b = ialloc(sizeof(Block)
.
94c
		q->state |= Qstarve;
.
83c
 *  Interrupt level copy out of a queue, return # bytes copied.  If drop is
 *  set, any bytes left in a block afer a consume are discarded.
.
65a
/*
 *  allocate queues and blocks
 */
.
62,63c
	cl = &arena.freed;
	p = a;
	lock(cl);
	p->next = cl->first;
	cl->first = p;
	unlock(cl);
.
60c
	Chunk *p;
	Chunkl *cl;
.
54,58c
void
ifree(void *a)
.
50,51c
	int pow;
	Chunkl *cl;
	Chunk *p;

	for(pow = Min; pow <= Maxpow; pow++)
		if(size <= (1<<pow)){
			cl = &arena.alloc[pow];
			lock(cl);
			p = cl->first;
			if(p){
				cl->have--;
				cl->first = p->next;
			}
			unlock(cl);
			return (void*)p;
		}
	panic("ialloc %d\n", size);
.
47,48c
void*
ialloc(int size)
.
41,44c
	int pow;
	Chunkl *cl;

	for(pow = Minpow; pow <= Maxpow; pow++){
		cl = &arena.alloc[pow];
		cl->goal = Maxpow-pow + 4;
	}

	/* start garbage collector */
	kproc("iallockproc", iallockproc, 0);
.
39c
iallocinit(void)
.
31,33c
		/* make sure we have blocks available for interrupt level */
		for(pow = Minpow; pow <= Maxpow; pow++){
			cl = &arena.alloc[pow];
			if(cl->have >= cl->goal){
				cl->had = cl->have;
				continue;
			}

			/* increase goal if we've been drained twice in a row */
			if(cl->have == 0 && cl->had == 0)
				cl->goal += cl->goal>>2;
			cl->had = cl->have;
			l = &first;
			for(i = x = cl->goal - cl->have; x > 0; x--){
				p = alloc(1<<pow);
				if(p == 0)
					break;
				*l = p;
				l = &p->next;
			}
			if(first){
				x = splhi();
				lock(cl);
				*l = cl->first;
				cl->first = first;
				cl->have += i;
				unlock(cl);
				spllo(x);
			}
.
24,29c
		/* really free what was freed at interrupt level */
		cl = &arena.freed;
		if(cl->first){
			x = slphi();
			lock(cl);
			first = cl->first;
			cl->first = 0;
			unlock(cl);
			spllo();
	
			for(; first; first = p){
				p = first->next;
				free(first);
			}
		}
.
21,22d
16c
	Chunk *p, *first, **l;
	Chunkl *cl;
	int pow, x, i;
.
14c
iallockproc(void *arg)
.
10,11c
 *  Manage interrupt level memory allocation.
.
8a
enum
{
	Minpow= 7,
	Maxpow=	12,
};

struct Chunk
{
	Chunk	*next;
};

struct Alloc
{
	Lock;
	Chunk	*first;
	int	had;
	int	goal;
	int	last;
};

struct Arena
{
	Chunkl	alloc[Maxpow-Minpow+1];
	Chunkl	freed;
};

static Arena arena;

.
7c
typedef struct Chunk	Chunk;
typedef	struct Chunkl	Chunkl;
typedef	struct Arena	Arena;
.
## diffname port/stream.c 1993/0513
## diff -e /n/fornaxdump/1993/0512/sys/src/brazil/port/stream.c /n/fornaxdump/1993/0513/sys/src/brazil/port/stream.c
207c
	lock(q);
	b = q->rfirst;
	if(b){
		/* hand to waiting receiver */
		n = b->lim - b->wp;
		if(n < len)
			len = n;
		memmove(b->wp, p, len);
		b->wp += len;
		q->rfirst = b->next;
		wakeup(&b->r);
		unlock(q);
		return len;
	}

	/* no waiting receivers, buffer */
	if(q->len >= q->limit)
		return -1;
	b = ialloc(sizeof(Block)+len);
	if(b == 0)
		return -1;
	b->base = (uchar*)(b+1);
	b->rp = b->base;
	b->wp = b->lim = b->base + len;
	memmove(b->rp, p, len);
	if(q->bfirst)
		q->blast->next = b;
	else
		q->bfirst = b;
	q->last = b;
	q->len += len;
	unlock(q);
	return len;
}

/*
 *  called by non-interrupt code
 */
Queue*
qopen(int limit)
{
	Queue *q;

	q = malloc(sizeof(Queue));
	if(q == 0)
		exhausted("Queues");
	q->limit = limit;
}

static int
bfilled(void *a)
{
	Block *b = a;

	return b->wp - b->rp;
}

long
qread(Queue *q, char *p, int len, int drop)
{
	Block *b, *bb;
	int x, n;

	/* ... to be replaced by a mapping */
	b = allocb(len);

	x = splhi();
	lock(q);
	bb = q->bfirst;
	if(bb == 0){
		/* wait for our block to be filled */
		if(q->rfirst)
			q->rlast->next = b;
		else
			q->rfirst = b;
		q->rlast = b;
		unlock(q);
		splx(x);
		sleep(&b->r, bfilled, b);
		n = BLEN(b);
		memmove(p, b->rp, n);
		return n;
	}

	/* grab a block from the buffer */
	n = BLEN(b);
	if(drop || n <= len){
		q->bfirst = b->next;
		q->len -= n;
		unlock(q);
		slpx(x);
		memmove(p, b->rp, n);
	} else {
		n = len;
		q->len -= n;
		memmove(p, b->rp, n);
		b->rp += n;
		unlock(q);
		slpx(x);
	}
	free(b);
	return n;
}

static int
qnotfull(void *a)
{
	Queue *q = a;

	return q->len < q->limit;
}

long
qwrite(Queue *q, char *p, int len)
{
	Block *b;
	int x, n;

	b = allocb(len);
	memmove(b->rp, p, len);
	b->wp += len;

	/* flow control */
	if(!qnotfull(q)){
		qlock(&q->wlock);
		sleep(&q->r, qnotfull, q);
		qunlock(&q->wlock);
	}
		
	x = splhi();
	lock(q);
	if(q->bfirst)
		q->blast->next = b;
	else
		q->bfirst = b;
	q->blast = b;
	q->len += len;
	unlock(q);
	splx(x);

	return len;
.
198a

	if(drop || len == n)
		ifree(b);

.
197a
	q->len -= len;

	/* wakeup flow controlled writers */
	if(q->len+len >= q->limit && q->len < q->limit)
		wakeup(&q->r);

.
193,196c
	if(drop || len == n)
		q->bfirst = b->next;
	else
.
183c
	b = q->bfirst;
.
163c
		exhausted("Blocks");
.
161c
	b = malloc(sizeof(Block) + size);
.
83c
				p = malloc(1<<pow);
.
79a
			else {
				x = cl->goal/2;
				if(cl->goal > 4 && cl->had > x && cl->have > x)
					cl->goal--;
			}

.
77,78c
			/*
			 *  increase goal if we've been drained, decrease
			 *  goal if we've had lots of blocks twice in a row.
			 */
			if(cl->have == 0)
.
## diffname port/stream.c 1993/0515
## diff -e /n/fornaxdump/1993/0513/sys/src/brazil/port/stream.c /n/fornaxdump/1993/0515/sys/src/brazil/port/stream.c
243,254c
	b = q->first;
	if(append && b && b->lim-b->wp <= len){
		memmove(b->wp, p, len);
		b->wp += len;
	} else {
		b = ialloc(sizeof(Block)+len);
		if(b == 0)
			return -1;
		b->base = (uchar*)(b+1);
		b->rp = b->base;
		b->wp = b->lim = b->base + len;
		memmove(b->rp, p, len);
		if(q->bfirst)
			q->blast->next = b;
		else
			q->bfirst = b;
		q->last = b;
	}
.
221c
produce(Queue *q, uchar *p, int len, int append)
.
## diffname port/stream.c 1993/0522
## diff -e /n/fornaxdump/1993/0515/sys/src/brazil/port/stream.c /n/fornaxdump/1993/0522/sys/src/brazil/port/stream.c
367a
	if((q->state & Qstarve) && q->kick){
		q->stat &= ~Qstarve;
		(*q->kick)(q->arg);
	}
.
277a
	q->kick = kick;
	q->arg = arg;
.
270c
qopen(int limit, void (*kick)(void*), void *arg)
.
## diffname port/stream.c 1993/0525
## diff -e /n/fornaxdump/1993/0522/sys/src/brazil/port/stream.c /n/fornaxdump/1993/0525/sys/src/brazil/port/stream.c
371c
		q->state &= ~Qstarve;
.
349c
	int x;
.
331c
		splx(x);
.
327,329c
	q->len -= n;
	unlock(q);
	splx(x);
	memmove(p, bb->rp, n);
	bb->rp += n;

	/* free it or put it back */
	if(drop || bb->rp == bb->wp)
		free(bb);
	else {
		x = splhi();
		lock(q);
		bb->next = q->bfirst;
		q->bfirst = bb;
.
317,325c
	/* copy from a buffered block */
	q->bfirst = bb->next;
	n = BLEN(bb);
	if(n > len)
.
313a
		free(b);
.
296c
	/* ... to be replaced by a kmapping if need be */
.
291c
qread(Queue *q, char *p, int len)
.
279a
	q->state = Qmsg;

	return q;
.
259c
		q->blast = b;
.
250a
		}
.
249c
		if(b == 0){
			unlock(q);
.
243,244c
	}

	/* save in buffer */
	b = q->bfirst;
	if((q->state&Qmsg)==0 && b && b->lim-b->wp <= len){
.
240,241c
	/* no waiting receivers, room in buffer? */
	if(q->len >= q->limit){
		unlock(q);
.
223a
	int n;
.
221c
qproduce(Queue *q, uchar *p, int len)
.
214c
	if((q->state&Qmsg) || len == n)
.
208,209c
	/* wakeup flow controlled writers (with a bit of histeresis) */
	if(q->len+len >= q->limit && q->len < q->limit/2)
.
202c
	if((q->state&Qmsg) || len == n)
.
186c
qconsume(Queue *q, uchar *p, int len)
.
145a
	return 0;			/* not reached */
.
133c
	for(pow = Minpow; pow <= Maxpow; pow++)
.
105c
				splx(x);
.
89c
			first = 0;
.
83,87d
75c
			} else
				cl->hist <<= 1;
.
73c
				cl->hist = ((cl->hist<<1) | 1) & 0xff;
				if(cl->hist == 0xff && cl->goal > 8)
					cl->goal--;
.
71a

			/*
			 *  if we've been ahead of the game for a while
			 *  start giving blocks back to the general pool
			 */
.
61c
			splx(x);
.
56c
			x = splhi();
.
51c
		tsleep(&arena.r, return0, 0, 500);
.
34a
	Rendez r;
.
33c
	Chunkl	alloc[Maxpow+1];
.
28c
	int	hist;
.
26c
	int	have;
.
22c
struct Chunkl
.
0a
#include	"u.h"
.
## diffname port/stream.c 1993/0526 # deleted
## diff -e /n/fornaxdump/1993/0525/sys/src/brazil/port/stream.c /n/fornaxdump/1993/0526/sys/src/brazil/port/stream.c
1,401d

Bell Labs OSI certified Powered by Plan 9

(Return to Plan 9 Home Page)

Copyright © 2021 Plan 9 Foundation. All Rights Reserved.
Comments to [email protected].