Plan 9 from Bell Labs’s /usr/web/sources/contrib/nemo/octopus/port/ofs/stop.b

Copyright © 2021 Plan 9 Foundation.
Distributed under the MIT License.
Download the Plan 9 distribution.


#
# Translation from styx to op for ofs.
# Either use the Cache or Op to operate on remote files.
#
# semantics:
#	dir (+initial data) fetched/checked upon walk
#	directories fully read.
#	dir hierarchies leading to files/dirs are created (faked) on demand.
#	files read on demand (after data read on first get)
#	creates deferred until first write, but for "mkdirs".
#	writethrough:
#		data pushed for write at 0; and partially filled puts
#	delayed write:
#		data pushed with full puts.
#
# All files with open fids are alive in the cache.
# Op's fds correspond to cached files. This means that a clunk is
# likely to clunk an op fd. Read and Write modes would use different fds.
#
# A coherency lag of N ms can be set. In this case,
# files checked out not more than Nms ago are considered
# up to date. For directories, this is tricky, because we must
# still read directories for which we got just metadata, and not data.
# in particular, directories invented during a walk.
#
# Each close in a file being written signals an end-of-put (clunk)
# in the server. However, this does not make any distinction between different
# client processes updating the same file in the server (which is reasonable, because
# in that case there would be races in any case).
implement Stop;

include "sys.m";
	sys: Sys;
	fprint, sprint, pctl, millisec, DMDIR, write, nulldir, tokenize, fildes,
	QTDIR, FD, Dir, Qid: import sys;
include "names.m";
	names: Names;
	cleanname: import names;
include "string.m";
	str: String;
	splitstrl: import str;
include "styx.m";
	styx: Styx;
	unpackdir: import styx;
include "styxservers.m";
	Enotfound, Eexists: import Styxservers;
include "op.m";
	OSTAT, NOFD, MAXDATA, OCREATE, ODATA, OMORE, Tmsg, Rmsg: import Op;
include "opmux.m";
	opmux: Opmux;
include "error.m";
	err: Error;
	stderr, panic: import err;
include "stop.m";
include "ofstree.m";
	ofstree: Ofstree;
	Cfile: import ofstree;

# To put a limit on the number of processes that we might spawn
# mainly because of Tputs for huge files.
Nprocs: con 30;

Xop: adt {
	fun:	ref fn(x: Xop);
	tag:	int;
	f:	ref Cfile;
	path:	string;
	d:	ref Dir;
	data:	array of byte;
	n:	int;
	o:	big;
	rc:	chan of ref Crep;
};

cfsreqc: chan of (ref Creq, chan of ref Crep);
xcfsreqc: chan of Xop;
xcfsendc: chan of int;
donec: chan of ref Cfile;
coherencylag := 0;

maxwaiting := 0;
maxheld := 0;

init(s: Styx, m: Opmux, dir: string, lag: int): string
{
	sys = load Sys Sys->PATH;
	err = load Error Error->PATH;
	err->init();
	names = load Names Names->PATH;
	str = load String String->PATH;
	opmux = m;
	ofstree = load Ofstree Ofstree->PATH;
	if (sys == nil || names == nil || str == nil || opmux == nil || ofstree == nil)
		return "load failed in cache";
		styx = s;
	coherencylag = lag;
	cfsreqc = chan of (ref Creq, chan of ref Crep);
	xcfsreqc = chan[5] of Xop;
	xcfsendc= chan of int;
	donec = chan[5] of ref Cfile;
	e := ofstree->init(sys, str, styx, err, names, dir);
	ofstree->debug = debug;
	if (e != nil)
		return e;
	spawn cfsproc();
	spawn xcfsproc();
	return nil;
}

term()
{
	cfsreqc <-= (nil, nil);
}

Creq.text(r: self ref Creq): string
{
	s := sprint("t%d q%bx", r.tag, r.qid);
	pick rr := r {
	Dump =>
		s += " dump";
	Remove =>
		s += " remove";
	Stat =>
		s += " stat";
	Sync =>
		s += " sync";
	Validate =>
		s += " validate [" + rr.path + "]";
	Walk1 =>
		s += " walk1 [" + rr.name + "]";
	Readdir =>
		s += sprint(" readdir c%d o%bd", rr.cnt, rr.off);
	Pread =>
		s += sprint(" pread c%d o%bd", rr.cnt, rr.off);
	Pwrite =>
		s += sprint(" pwrite [%3.3s...] c%d o%bd", string rr.data, len rr.data, rr.off);
	Wstat =>
		s += " wstat";
	Create =>
		s += " create [" + rr.d.name + "]";
	Flush =>
		s += sprint(" flush %d", rr.oldtag);
	}
	return s;
}

Crep.text(r: self ref Crep): string
{
	s := "";
	pick rr := r {
	Remove =>
		s += " remove";
	Stat =>
		s += " stat";
		if (rr.d != nil)
			s += sprint(" [%s] q%bx:%d", rr.d.name, rr.d.qid.path, rr.d.qid.vers);
	Sync =>
		s += " sync";
	Validate =>
		s += " validate";
		if (rr.d != nil)
			s += sprint(" [%s] q%bx:%d", rr.d.name, rr.d.qid.path, rr.d.qid.vers);
	Walk1 =>
		s += " walk1";
		if (rr.d != nil)
			s += sprint(" [%s] q%bx:%d", rr.d.name, rr.d.qid.path, rr.d.qid.vers);
	Readdir =>
		s += sprint(" readdir n%d", len rr.sons);
	Pread =>
		s += sprint(" pread [%3.3s...] c%d", string rr.data, len rr.data);
	Pwrite =>
		s += sprint(" pwrite c%d", rr.count);
	Wstat =>
		s += " wstat";
		if (rr.d != nil)
			s += sprint(" [%s] q%bx:%d", rr.d.name, rr.d.qid.path, rr.d.qid.vers);
	Create =>
		s += " create";
		if (rr.d != nil)
			s += sprint(" [%s] q%bx:%d", rr.d.name, rr.d.qid.path, rr.d.qid.vers);
	Flush =>
		s += sprint(" flush");
	}
	if (r.err != nil)
		s += " errstr=" + r.err;
	return s;
}

crepcs: list of chan of ref Crep;
getcrepc(): chan of ref Crep
{
	if (crepcs == nil)
		return chan of ref Crep;
	else {
		c := hd crepcs;
		crepcs = tl crepcs;
		return c;
	}
}
putcrepc(c: chan of ref Crep)
{
	crepcs = c::crepcs;
}

cacherpc(r: ref Creq) : ref Crep
{
	c := getcrepc();
	if (debug)
		fprint(stderr, "<c %d\t%s\n", millisec(), r.text());
	cfsreqc <-= (r, c);
	rep := <-c;
	putcrepc(c);
	if (debug)
		fprint(stderr, "c> %d\tt%d %s\n", millisec(), r.tag, rep.text());
	return rep;
}

dump()
{
	cacherpc(ref Creq.Dump(0, big 0));
}

validate(tag: int, qid: big, name: string): (ref Dir, string)
{
	r := cacherpc(ref Creq.Validate(tag, qid, name));
	pick rr := r {
	Validate =>
		return (rr.d, rr.err);
	}
	panic("validatebug");
	return (nil, nil);
}

create(tag: int, qid: big, d: ref Sys->Dir): (ref Sys->Dir, string)
{
	r := cacherpc(ref Creq.Create(tag, qid, ref *d));
	if (r.err == "invalid"){
		cacherpc(ref Creq.Validate(tag, qid, ""));
		r = cacherpc(ref Creq.Create(tag, qid, ref *d));
	}
	pick rr := r {
	Create =>
		return (rr.d, rr.err);
	}
	panic("createbug");
	return (nil, nil);
}

remove(tag: int, qid: big) : string
{
	r := cacherpc(ref Creq.Remove(tag, qid));
	pick rr := r {
	Remove =>
		return rr.err;
	}
	panic("removebug");
	return nil;
}

walk1(tag: int, qid: big, elem: string): (ref Sys->Dir, string)
{
	r := cacherpc(ref Creq.Walk1(tag, qid, elem));
	if (r.err == "invalid"){
		cacherpc(ref Creq.Validate(tag, qid, elem));
		r = cacherpc(ref Creq.Walk1(tag, qid, elem));
	}
	pick rr := r {
	Walk1 =>
		return (rr.d, rr.err);
	}
	panic("walk1bug");
	return (nil, nil);
}

readdir(tag: int, qid: big, cnt: int, off: int): (list of Sys->Dir, string)
{
	r := cacherpc(ref Creq.Readdir(tag, qid, cnt, big off));
	if (r.err == "invalid"){
		cacherpc(ref Creq.Validate(tag, qid, ""));
		r = cacherpc(ref Creq.Readdir(tag, qid, cnt, big off));
	}
	pick rr := r {
	Readdir =>
		return (rr.sons, rr.err);
	}
	panic("readdirbug");
	return (nil, nil);
}

pread(tag: int, qid: big, cnt: int, off: big): (array of byte, string)
{
	r := cacherpc(ref Creq.Pread(tag, qid, cnt, off));
	if (r.err == "invalid"){
		cacherpc(ref Creq.Validate(tag, qid, ""));
		r = cacherpc(ref Creq.Pread(tag, qid, cnt, off));
	}
	pick rr := r {
	Pread =>
		return (rr.data, rr.err);
	}
	panic("preadbug");
	return (nil, nil);
}

pwrite(tag: int, qid: big, data: array of byte, off: big) : (int, string)
{
	r := cacherpc(ref Creq.Pwrite(tag, qid, data, off));
	if (r.err == "invalid"){
		cacherpc(ref Creq.Validate(tag, qid, ""));
		r = cacherpc(ref Creq.Pwrite(tag, qid, data, off));
	}
	pick rr := r {
	Pwrite =>
		return (rr.count, rr.err);
	}
	panic("pwritebug");
	return (0, nil);
}

stat(tag: int, qid: big): (ref Sys->Dir, string)
{
	r := cacherpc(ref Creq.Stat(tag, qid));
	if (r.err == "invalid"){
		cacherpc(ref Creq.Validate(tag, qid, ""));
		r = cacherpc(ref Creq.Stat(tag, qid));
	}
	pick rr := r {
	Stat =>
		return (rr.d, rr.err);
	}
	panic("statbug");
	return (nil, nil);
}

wstat(tag: int, qid: big, d: ref Sys->Dir): (ref Sys->Dir, string)
{
	r := cacherpc(ref Creq.Wstat(tag, qid, ref *d));
	if (r.err == "invalid"){
		cacherpc(ref Creq.Validate(tag, qid, ""));
		r = cacherpc(ref Creq.Wstat(tag, qid, ref *d));
	}
	pick rr := r {
	Wstat =>
		return (rr.d, rr.err);
	}
	panic("wstatbug");
	return (nil, nil);
}

flush(tag: int, old: int): string
{
	r := cacherpc(ref Creq.Flush(tag, big 0, old));
	pick rr := r {
	Sync =>
		return rr.err;
	}
	panic("flushbug");
	return nil;
}

sync(tag: int, qid: big): string
{
	r := cacherpc(ref Creq.Sync(tag, qid));
	pick rr := r {
	Sync =>
		return rr.err;
	}
	panic("syncbug");
	return nil;
}

opfname(d, n: string) : string
{
	path: string;
	if (n == "" || n == "/")
		path = d;
	else if (d == "/")
		path = n;
	else
		path = d + "/" + n;
	path = cleanname(path);

	return path;
}

Wlist: type list of (ref Cfile, ref Creq, chan of ref Crep);

waitingop(l: Wlist, rf: ref Cfile): (Wlist, ref Creq, chan of ref Crep)
{
	rr: ref Creq;
	rrc: chan of ref Crep;

	rl : Wlist;
	for(; l != nil; l = tl l){
		(f, r, rc) := hd l;
		if (f == rf && rr == nil)
			(rr, rrc) = (r, rc);
		else
			rl = (f, r, rc) :: rl;
	}
	return (rl, rr, rrc);
}

dumpwaiting(l: Wlist)
{
	fprint(stderr, "\tcache: waiting:\n");
	for(; l != nil; l = tl l){
		(f, r, nil) := hd l;
		fprint(stderr, "\t\t%s: %s\n", f.d.name, r.text());
	}
}

# Main cache mux
# Requests are either being processed here, or spawned in auxiliary
# processes speaking op (and going through the opmux),
# or waiting for other requests to clear a file from being busy

cfsproc()
{
	waiting: Wlist;

	rootd := ref nulldir;
	rootd.name = "/";
	rootd.uid = rootd.gid = rootd.muid = "sys";
	rootd.qid = Qid(big 0, 0, QTDIR);
	root := Cfile.create(nil, rootd);
	if (root == nil)
		panic("cache: can't create root");
	if (debug)
		fprint(stderr, "cache: new: %s\n", root.text());
	readyfile: ref Cfile;
	taggen := 0;
Loop:
	for (;;){
		r: ref Creq;
		rc: chan of ref Crep;
		r = nil; rc = nil;
		# Attend first pending ops on a file that was busy.
		if (len waiting > maxwaiting)
			maxwaiting  = len waiting;
		if (readyfile != nil){
			(waiting, r, rc) = waitingop(waiting, readyfile);
			if (r != nil && debug)
				fprint(stderr, "\tcache: readyop: %s\n", r.text());
		}
		if (debug)
			dumpwaiting(waiting);
		if (r == nil){
			readyfile = nil;
			alt {
			(r, rc) = <-cfsreqc =>
				;
			f := <-donec =>
				f.busy = 0;
				readyfile = f;
				if (debug)
					fprint(stderr, "\tcache: %s ready\n", f.d.name);
				continue Loop;
			}
		}
		if (r == nil){
			if (debug) fprint(stderr, "mcache: stop\n");
			break;
		}
		f := Cfile.find(r.qid);
		if (f != nil && f.d == nil)
			panic(sprint("cache: nil dir for file 16r%bx", r.qid));
		pick rr := r {
		Flush =>
			xcfsreqc <-= Xop(xopflush, taggen++,nil,nil,nil,nil,rr.oldtag,big 0,rc);
		Validate =>
			if (f == nil){
				rc <-= ref Crep.Validate("invalid", nil);
				continue;
			} else if (f.busy) {
				if (debug) fprint(stderr, "\tcache: busy\n");
				waiting = (f, r, rc) :: waiting;
				continue;
			}

			# 1. Create a fake dir hierarchy for all but the last component.
			(nels, els) := tokenize(rr.path, "/");
			wf := f;
			for (i := 0 ; i < nels -1; i++){	# find or create a fake entry
				(wf, nil) = wf.walkorcreate(hd els, nil);
				if (wf != nil && wf.busy){
					if (debug) fprint(stderr, "\tcache: busy\n");
					waiting = (wf, r, rc) :: waiting;
					continue Loop;
				}
				els = tl els;
			}
			f = wf;

			# 2. try to validate the last component.
			# els could be nil (e.g., for "/"), or for a pure clone
			sf: ref Cfile;
			if (els == nil)
				sf = f;
			else {
				if (f == nil)
					panic("mcache validate bug");
				sf = f.walk(hd els);
			}
			# sf is either nil, or points to the file (and maybe sf == f)
			if (sf  != nil && sf.busy){
				if (debug) fprint(stderr, "\tcache: busy\n");
				waiting = (sf, r, rc) :: waiting;
				continue;
			}
			now := millisec();
			# This optimization does not seem to work in some cases
			if (0)
			if (sf == nil && f != sf && f.dirreaded && now - f.time < coherencylag){
				rc <-= ref Crep.Validate("file does not exist", nil);
				continue;
			}
			if (sf != nil && now - sf.time < coherencylag){
				rc <-= ref Crep.Validate(nil, ref *sf.d);
				continue;
			}
			# don't know. Ask the caller for help. Put the file on hold.
			isnew := 0;
			if (sf == nil){
				# BUG: This is a problem. The cache should create the file
				# only when necessary to keep cached data. Otherwise, we
				# create the file here as a directory and it might later be a file,
				# Things are fixed when the cache finds out the problem, but
				# it would be better to make it right from the start.
				(sf, isnew) = f.walkorcreate(hd els, nil);
			}
			if (sf.busy){
				if (debug) fprint(stderr, "\tcache: busy\n");
				waiting = (sf, r, rc) :: waiting;
				continue;
			}
			sf.busy = 1;
			path := sf.getpath();
			xcfsreqc <-= Xop(xgetandvalidate, taggen++, sf, path, nil, nil, isnew, big 0, rc);
		Create =>
			if (f == nil){
				rc <-= ref Crep.Create("invalid", nil);
				continue;
			} else if (f.busy) {
				if (debug) fprint(stderr, "\tcache: busy\n");
				waiting = (f, r, rc) :: waiting;
				continue;
			}
			(f,nil) = f.walkorcreate(rr.d.name, rr.d);
			if (f.busy){
				if (debug) fprint(stderr, "\tcache: busy\n");
				waiting = (f, r, rc) :: waiting;
				continue;
			}
			f.data = array[0] of byte;
			f.created = f.dirtyd = 1;
			if (f.d.qid.qtype&QTDIR){
				f.busy = 1;
				path := f.getpath();
				xcfsreqc <-= Xop(xputandcreate, taggen++, f, path, rr.d, nil, 0, big 0, rc);
			} else
				rc <-= ref Crep.Create(nil, ref *f.d);
		Remove =>
			if (f == nil){
				rc <-= ref Crep.Remove("invalid");
				continue;
			} else if (f.busy) {
				if (debug) fprint(stderr, "\tcache: busy\n");
				waiting = (f, r, rc) :: waiting;
				continue;
			}
			f.busy = 1;
			path := f.getpath();
			xcfsreqc <-= Xop(xdelandremove, taggen++, f, path, nil, nil, 0, big 0, rc);
		Walk1 =>
			if (f == nil){
				rc <-= ref Crep.Walk1("invalid", nil);
				continue;
			} else if (f.busy) {
				if (debug) fprint(stderr, "\tcache: busy\n");
				waiting = (f, r, rc) :: waiting;
				continue;
			}
			f = f.walk(rr.name);
			if (f != nil && f.busy){
				if (debug) fprint(stderr, "\tcache: busy\n");
				waiting = (f, r, rc) :: waiting;
				continue;
			}
			if (f == nil){
				# if this error ever happens, try to (re)validate this file
				rc <-= ref Crep.Walk1("invalid", nil);
			} else
				rc <-= ref Crep.Walk1(nil, ref *f.d);
		Readdir =>
			if (f == nil){
				rc <-= ref Crep.Readdir("invalid", nil);
				continue;
			} else if (f.busy) {
				if (debug) fprint(stderr, "\tcache: busy\n");
				waiting = (f, r, rc) :: waiting;
				continue;
			}
			rc <-= ref Crep.Readdir(nil, f.children(rr.cnt, int rr.off));
		Pread =>
			if (f == nil){
				rc <-= ref Crep.Pread("invalid", nil);
				continue;
			} else if (f.busy) {
				if (debug) fprint(stderr, "\tcache: busy\n");
				waiting = (f, r, rc) :: waiting;
				continue;
			}
			# we cache prefixes only, and don't believe that files
			# reporting zero length are indeed empty.
			if (rr.off >= f.d.length && f.d.length > big 0)
				rc <-= ref Crep.Pread(nil, array[0] of byte);
			else if (rr.off < big len f.data){
				n := int rr.off + rr.cnt;
				if (n >  len f.data)
					n = len f.data;
				rc <-= ref Crep.Pread(nil, f.data[int rr.off:n]);
			} else if (!(f.d.qid.qtype&QTDIR) && (cdata := f.pread(rr.cnt, rr.off)) != nil)
				rc <-= ref Crep.Pread(nil, cdata);
			else {
				path := f.getpath();
				f.busy = 1;
				xcfsreqc <-= Xop(xgetandread, taggen++, f, path, nil, nil, rr.cnt, rr.off, rc);
			}
		Pwrite =>
			# This is write-through (perhaps asynchronously)
			if (f == nil){
				rc <-= ref Crep.Pwrite("invalid", -1);
				continue;
			} else if (f.busy) {
				if (debug) fprint(stderr, "\tcache: busy\n");
				waiting = (f, r, rc) :: waiting;
				continue;
			}
			if (rr.off <= big len f.data){
				if (int rr.off + len rr.data > MAXCACHED){
					f.data = f.data[0:int rr.off];
				} else {
					off := int rr.off;
					ndata := array[off + len rr.data] of byte;
					ndata[0:] = f.data[0:off];
					ndata[off:] = rr.data;
					f.data = ndata;
					if (f.d.length < big len f.data)
						f.d.length = big len f.data;
				}
			}
			f.pwrite(rr.data, rr.off);
			path := f.getpath();
			f.busy = 1;
			# keep this condition in sync with async in putandwrite()
			l := len f.d.name;
			if ((rr.off != big 0 && (l > 4 && (f.d.name[l-4:l] == ".dis" || f.d.name[l-4:l] == ".sbl"))) ||
			   (len rr.data == 8192 && !f.created &&  rr.off != big 0))
				rc <-= ref Crep.Pwrite(nil, len rr.data);
			xcfsreqc <-= Xop(xputandwrite, taggen++, f, path, nil, rr.data, 0, rr.off, rc);
		Stat =>
			if (f == nil){
				rc <-= ref Crep.Stat("invalid", nil);
				continue;
			} else if (f.busy) {
				if (debug) fprint(stderr, "\tcache: busy\n");
				waiting = (f, r, rc) :: waiting;
				continue;
			}
			rc <-= ref Crep.Stat(nil, ref *f.d);
		Wstat =>
			if (f == nil){
				rc <-= ref Crep.Wstat("invalid", nil);
				continue;
			} else if (f.busy) {
				if (debug) fprint(stderr, "\tcache: busy\n");
				waiting = (f, r, rc) :: waiting;
				continue;
			}
			truncate := (f.d.length != big 0 && rr.d.length == big 0);
			if (truncate){
				f.data = array[0] of byte;
				f.created = 1;			# to truncate it
			}
			f.dirtyd  = 1;
			f.busy = 1;
			path := f.getpath();
			xcfsreqc <-= Xop(xputandwstat, taggen++, f, path, rr.d, nil, 0, big 0, rc);
		Sync =>
			if (f == nil){
				rc <-= ref Crep.Sync("invalid");
				continue;
			} else if (f.busy) {
				if (debug) fprint(stderr, "\tcache: busy\n");
				waiting = (f, r, rc) :: waiting;
				continue;
			}
			path := f.getpath();
			f.busy = 1;
			xcfsreqc <-= Xop(xputandsync, taggen++, f, path, nil, nil, 0, big 0, rc);
		Dump =>
			root.dump(0, "cfs tab:\n");
			fprint(stderr, "cache: %d max busy files, %d max held ops\n", maxwaiting, maxheld);
			rc <-= ref Crep.Dump(nil);
		}
	}
}

# BUG: cache xfuncs and put a reasonably high limit to the number of processes.
xcfsproc()
{
	pending: list of Xop;
	procs: list of chan of Xop;
	nprocs := 0;
	idlec := chan of chan of Xop;
	for(;;){
		alt {
		x := <-xcfsreqc =>
			if (x.fun== nil)
				exit;
			xc: chan of Xop;
			if (procs != nil){
				xc = hd procs;
				procs = tl procs;
			} else if (nprocs < Nprocs) {
				nprocs++;
				xc = chan of Xop;
				spawn xproc(xc, idlec);
			} else
				xc = nil;

			if (xc != nil)
				xc <-= x;
			else {
				pending = x :: pending;
				if (len pending > maxheld)
					maxheld = len pending;
				if (debug)
					fprint(stderr, "o/ofs: %d processes, op held\n", nprocs);
			}

		xc := <-idlec =>
			if (pending != nil){
				x := hd pending;
				pending = tl pending;
				if (debug)
					fprint(stderr, "o/ofs: held op released\n");
				xc <-= x;
			} else
				procs = xc::procs;
		}
	}
}

xproc(xc: chan of Xop, idlec: chan of chan of Xop)
{
	for(;;){
		x := <-xc;
		x.fun(x);
		idlec <-= xc;
	}
}

xopflush(nil: Xop)
{
	# opflush(tag, n, rc);	BUG: pretend it's flushed.
}
opflush(tag, oldtag: int, rc: chan of ref Crep)
{
	repc := opmux->rpc(ref Tmsg.Flush(2*tag, 2*oldtag));
	<-repc;
	rc <-= ref Crep.Flush(nil);
}

# BUG: Doing a ls -ld on a file that has NOT read permission will make Tget fail
# because we ask for data as well, and oxport would report a permission denied while trying to
# open the file for reading. This must be changed. If OSTAT|ODATA is asked to oxport,
# and only stat could be retrieved, only stat should be reported back, with just OSTAT in the
# reply. Of course, this means that there are problems reading file data, and the client may issue
# a Tget(ODATA) should it want to learn the cause (or probably assume just a permission denied).
get(tag: int, path: string, fd: int, mode: int, cnt: int, off: big): (int, ref Dir, array of byte, string)
{
	data := array[0] of byte;
	d : ref Dir;

	nm := 1;
	if (mode&ODATA){
		while(nm < MAXNMSGS && off + big (Op->MAXDATA*nm) < big MAXCACHED)
			nm++;
		if (nm > 1 || off < big MAXCACHED)
			cnt = MAXDATA;
	}
	req := ref Tmsg.Get(tag, path, fd, mode, nm, off, cnt);
	repc := opmux->rpc(req);
	nmsgs := nm;
	d = nil;
	for (;;){
		rep := <- repc;
		pick r := rep {
		Error =>
			return (Op->NOFD, nil, nil, r.ename);
		Get =>
			if ((r.mode&OSTAT) != 0){
				d = ref r.stat;
				if ((d.qid.qtype&QTDIR) != 0)
					nmsgs = 0;
			}
			if ((r.mode &ODATA) != 0){
				ndata := array[len data + len r.data] of byte;
				ndata[0:] = data;
				ndata[len data:] = r.data;
				data = ndata;
			}
			if (--nmsgs == 0 || (r.mode&OMORE) == 0)	# last message
				return(r.fd, d, data, nil);
		* =>
			panic("get0bug");
		}
	}
	panic("get0bug");
	return (Op->NOFD, nil, nil, "bug");
}

xgetandvalidate(x: Xop)
{
	getandvalidate(x.tag, x.f, x.path, x.n, x.rc);
}
getandvalidate(tag: int, f: ref Cfile, path: string, isnew: int, rc: chan of ref Crep)
{
		(fd, d, data, e) := get(tag, path, f.oprdfd, OSTAT|ODATA|OMORE, MAXDATA, big 0);
		f.oprdfd = fd;
		if (e != nil || d == nil){
			f.remove();
			rc <-= ref Crep.Validate(e, nil);
		}
		else {
			if (isnew)
				f.serverqid = d.qid;
			else {
				if (f.serverqid.path != d.qid.path || f.serverqid.vers != d.qid.vers)
				if (!f.created)
					f.data = array[0] of byte;
				f.serverqid = d.qid;
			}
			f.wstat(d);
			if (d.qid.qtype&QTDIR){
				f.dirreaded = 1;
				f.updatedirdata(data);
			} else {
				f.pwrite(data, big 0);
				if (!f.dirtyd && !f.created)
					f.data = data;
			}
			f.d.qid.vers = d.qid.vers;
			f.time = millisec();
			rc <-= ref Crep.Validate(nil, ref *f.d);
		}
		donec <-= f;
}

xgetandread(x: Xop)
{
	getandread(x.tag, x.f, x.path, x.n, x.o, x.rc);
}
getandread(tag: int, f: ref Cfile, path: string, cnt: int, off: big, rc: chan of ref Crep)
{
	(fd, nil, data, e) := get(tag, path, f.oprdfd, ODATA|OMORE, cnt, off);
	f.oprdfd = fd;
	if (e != nil){
		# fsremove(fstab, f.d.qid.path); should?
		rc <-= ref Crep.Pread(e, nil);
	} else {
		if (f.d.length != big 0){		# files with zero-length are probably streams.
			f.pwrite(data, off);	# do not cache.
			odata := f.data;
			o := int off;
			if (o <= len odata){
				if (o + len data > MAXCACHED)
					f.data = odata[0:o];
				else {
					ndata := array[o + len data] of byte;
					ndata[0:] = odata[0:o];
					ndata[o:] = data;
					f.data = ndata;
					if (f.d.length < big len f.data)
						f.d.length = big len f.data;
				}
			}
		}
		f.time = millisec();
		rc <-= ref Crep.Pread(nil, data);
	}
	donec <-= f;
}

# BUG: Too many args; put does it all.
put(tag: int, path: string, opfd: int, d: ref Dir, data: array of byte, off: big, mkit, more: int) : (int, int, int, string)
{
	if (d == nil && data == nil && opfd == Op->NOFD)
		return (NOFD, 0, 0, nil);
	flag := 0;
	if (d != nil)
		flag |= OSTAT;
	if (data != nil && (!mkit || (d.qid.qtype&QTDIR) == 0))
		flag |= ODATA;
	if (mkit)
		flag |= OCREATE;
	if (more)
		flag |= OMORE;
	if (data == nil)
		data = array[0] of byte;
	tot := 0;
	rfd := NOFD;
	mtime := 0;
	for(;;){
		nw := len data[tot:];
		if (nw > MAXDATA)
			nw = MAXDATA;
		if (d == nil)
			d = ref nulldir;
		rc := opmux->rpc(ref Tmsg.Put(tag, path, opfd, flag, *d, off, data[tot:tot+nw]));
		vers := 0;
		r := <- rc;
		pick rr := r {
		Put =>
			if (rr.count != nw)
				return (NOFD, 0, 0, "short write");
			vers = rr.qid.vers;
			rfd = rr.fd;
			mtime = rr.mtime;
		Error =>
			return (NOFD, 0, 0, rr.ename);
		* =>
			panic("putbug");
		}
		tot += nw;
		if (tot == len data)
			return (rfd, vers, mtime, nil);
		flag &= ~(OSTAT|OCREATE);
	}
	panic("putbug");
	return (NOFD, 0, 0, "putbug");
}

xputandcreate(x: Xop)
{
	putandcreate(x.tag, x.f, x.path, x.d, x.rc);
}
putandcreate(tag: int, f: ref Cfile, path: string, d: ref Dir, rc: chan of ref Crep)
{
	(fd, vers, mtime, e) := put(tag, path, f.opwrfd, f.d, nil, big 0, 1, 1);
	f.opwrfd = fd;
	if (e != nil){
		f.remove();
		rc <-= ref Crep.Create(e, nil);
	} else {
		f.created = f.dirtyd = 0;
		f.wstat(d);
		if (vers != 0 || mtime != 0){
			f.serverqid.vers = vers;
			f.d.qid.vers = vers;
			f.d.mtime = mtime;
		}
		f.time = millisec();
		rc <-= ref Crep.Create(nil, ref *f.d);
	}
	donec <-= f;
}

xputandwrite(x: Xop)
{
	putandwrite(x.tag, x.f, x.path, x.data, x.o, x.rc);
}
putandwrite(tag: int, f: ref Cfile, path: string, data: array of byte, off: big, rc: chan of ref Crep)
{
	async := (len data == 8192 && !f.created &&  off != big 0); #  (off % big (4*8192)) != big 0);
	l := len f.d.name;
	if (off != big 0 && (l > 4 && (f.d.name[l-4:l] == ".dis" || f.d.name[l-4:l] == ".sbl")))
		async = 1;
	d: ref Dir;
	if (f.dirtyd)
		d = f.d;
	f.dirtyd = 0;
	mkit := f.created;
	f.created = 0;
	if (async)
		donec <-= f;
	(fd, vers, mtime, e) := put(tag, path, f.opwrfd, d, data, off, mkit, 1);
	f.opwrfd = fd;
	if (e != nil){
		f.remove();
		if (!async)
			rc <-= ref Crep.Pwrite(e, len data);
		else
			fprint(stderr, "\n*** ofs: delayed write error ***\n\n");
	} else {
		if (!async){
			f.serverqid.vers = vers;
			f.d.qid.vers = vers;
			f.d.mtime= mtime;
			f.time = millisec();
			rc <-= ref Crep.Pwrite(nil, len data);
		} else {
			# Race, but safer this way.
			f.d.qid.vers = vers;
			if (mtime)
				f.d.mtime = mtime;
		}
	}
	if (!async)
		donec <-= f;
}

xputandwstat(x: Xop)
{
	putandwstat(x.tag, x.f, x.path, x.d, x.rc);
}
putandwstat(tag: int, f: ref Cfile, path: string, d: ref Dir, rc: chan of ref Crep)
{
	mkit := f.created;
	f.created = 0;
	f.dirtyd = 0;
	(fd, vers, mtime, e) := put(tag, path, f.opwrfd, d, nil, big 0, mkit, 1);
	f.opwrfd = fd;
	if (e != nil){
		f.remove();
		rc <-= ref Crep.Wstat(e, nil);
	} else {
		f.serverqid.vers = vers;
		f.wstat(d);
		f.d.mtime = mtime;
		f.time = millisec();
		rc <-= ref Crep.Wstat(nil, ref *f.d);
	}
	donec <-= f;
}

xputandsync(x: Xop)
{
	putandsync(x.tag, x.f, x.path, x.rc);
}
putandsync(tag: int, f: ref Cfile, path: string, rc: chan of ref Crep)
{
	mkit := f.created;
	f.created = 0;
	d: ref Dir;
	if (f.dirtyd)
		d = ref *f.d;
	f.dirtyd = 0;
	f.oldname = nil;
	f.fsfd = nil;
	rc <-= ref Crep.Sync(nil);
	(fd, vers, mtime, e) := put(tag, path, f.opwrfd, d, nil, big 0, mkit, 0);
	f.opwrfd = fd;
	if (f.oprdfd != NOFD){
		(fd, nil, nil, nil) = get(tag, path, f.oprdfd, OSTAT, 0, big 0);
		f.oprdfd = fd;
	}
	if (e != nil)
		f.remove();
	else {
		f.serverqid.vers = vers;
		if (vers != 0)
			f.d.qid.vers = vers;
		if (mtime != 0)
			f.d.mtime  = mtime;
		f.time = millisec();
	}
	donec <-= f;
}

xdelandremove(x: Xop)
{
	delandremove(x.tag, x.f, x.path, x.rc);
}
delandremove(tag: int, f: ref Cfile, path: string, rc: chan of ref Crep)
{
	oprc := opmux->rpc(ref Tmsg.Remove(2*tag, path));
	opr := <-oprc;
	e: string;
	pick oprr := opr {
	Remove =>
		f.remove();
	Error =>
		e = oprr.ename;
	}
	rc <-= ref Crep.Remove(e);
	if (f.oprdfd != NOFD){
		(fd, nil, nil, nil) := get(tag, path, f.oprdfd, OSTAT, 0, big 0);
		f.oprdfd = fd;
	}
	if (f.opwrfd != NOFD){
		(fd, nil, nil, nil) := put(tag, path, f.opwrfd, nil, nil, big 0, 0, 0);
		f.opwrfd = fd;
	}
	donec <-= f;
}

Bell Labs OSI certified Powered by Plan 9

(Return to Plan 9 Home Page)

Copyright © 2021 Plan 9 Foundation. All Rights Reserved.
Comments to [email protected].