Plan 9 from Bell Labs’s /usr/web/sources/contrib/nemo/octopus/port/ofs/ofs.b

Copyright © 2021 Plan 9 Foundation.
Distributed under the MIT License.
Download the Plan 9 distribution.


# Op to styx file server
implement Ofs;

include "sys.m";
	sys: Sys;
	MREPL, FD, MAFTER, DMDIR, QTDIR, fprint, pctl, fildes, MBEFORE, MCREATE, create, OREAD,
	open, read, ORDWR, nulldir, dial, sprint, pipe, stat, mount, werrstr, OTRUNC: import sys;
include "draw.m";
include "styx.m";
	styx: Styx;
	Rmsg, Tmsg: import styx;
include "ofsstyx.m";
	styxs: Styxservers;
	Styxserver, Fid, Ebadfid, Navigator, Navop: import styxs;
	Enotdir, Enotfound: import styxs;
include "arg.m";
	arg: Arg;
	usage: import arg;
include "keyring.m";
include "security.m";
include "env.m";
	env: Env;
	getenv: import env;
include "string.m";
	str: String;
	splitstrl: import str;
include "netutil.m";
	util: Netutil;
	Client, netmkaddr, authfd : import util;
include "op.m";
	op: Op;
include "opmux.m";
	opmux: Opmux;
include "stop.m";
	stop: Stop;
include "error.m";
	err: Error;
	checkload, kill, stderr, error: import err;
include "ofsnotify.m";
	notify: Ofsnotify;

Ofs: module {
	init: fn(nil: ref Draw->Context, argv: list of string);
};

Stats: adt {
	nwalk, nopen, ncreate, nread, nwrite: int;
	nclunk, nstat, nremove, nwstat: int;

	dump: fn(s: self ref Stats);
};

# Process model:
# 	requestproc accepts one request after another from either the styxserver
#	or the navigator, and spawns one process per request (either
#	fsreq or navreq). Thus, we process concurrent requests.
#
#	In the stop cache, a single process synchronizes access to the tree and takes
#	one request after another. However, it spawns other processes to speak
#	Op. Cached requests are replied by the central cfsproc process. Other
#	requests spawn a process to issue the Op RPC(s) concurrently.
#	Such Op speaker processes are spawned by xcfsproc(), to control the max
#	number of concurrent requests that we might place on the remote server.
#
#	replyproc accepts one reply after another to be sent to the client


debug := 0;
doauth := 1;
dostats := 0;
stats : ref Stats;
user := "none";

Stats.dump(s: self ref Stats)
{
	tot := s.nwalk + s.nopen + s.ncreate + s.nread + s.nwrite;
	tot += s.nclunk + s.nstat + s.nremove  + s.nwstat;
	tot += 2;	# attach/version
	fprint(stderr,"styx:\n");
	fprint(stderr,"\t%s\t%d\n", "walk", s.nwalk);
	fprint(stderr,"\t%s\t%d\n", "open", s.nopen);
	fprint(stderr,"\t%s\t%d\n", "create", s.ncreate);
	fprint(stderr,"\t%s\t%d\n", "read", s.nread);
	fprint(stderr,"\t%s\t%d\n", "write", s.nwrite);
	fprint(stderr,"\t%s\t%d\n", "clunk", s.nclunk);
	fprint(stderr,"\t%s\t%d\n", "stat", s.nstat);
	fprint(stderr,"\t%s\t%d\n", "remove", s.nremove);
	fprint(stderr,"\t%s\t%d\n", "wstat", s.nwstat);
	fprint(stderr,"\ttotal\t%d\n", tot);
}


navreq(m: ref Navop)
{
	qid := m.path;
	pick n := m {
	Stat =>
		n.reply <-= stop->stat(m.tag, qid);
	Walk =>
		if (n.name == nil || n.name == ".")
			n.reply <-= stop->validate(m.tag, qid, "");
		else if (n.name[0] == '/'){
			# see ./ofsstyxservers.b
			# Got "/a/b/c" because we are about to process a Twalk
			# for {a, b, c} for qid.
			# Here is when we check that the cache entry is valid.
			stats.nwalk++;
			(nil, sub) := splitstrl(n.name, "!!DUMP");
			if (sub != nil){
				# stop->dump();
				stats.dump();
				opmux->dump();
			}
			n.reply <-= stop->validate(m.tag, qid, n.name);
		} else{
			# Then we get one-by-one walks
			n.reply <-= stop->walk1(m.tag, qid, n.name);
		}
	Readdir =>
		# BUG: This asks stop to read a directory a lot of times,
		# even for a single directory.  Fixing this requires changes to
		# Styxserver.read (label Dread in ofsstyx.b).
		# Indeed, the real BUG is that the navigator should go.
		# We can adjust ofsstyx.b to ask us what we know we have
		# in stop.b

		(dl, e) := stop->readdir(m.tag, qid, n.count, n.offset);
		if (e != nil)
			n.reply <-= (nil, e);
		else {
			for (; dl != nil; dl = tl dl)
				n.reply <-= (ref hd dl, nil);
			n.reply <-= (nil, nil);
		}
	}
}

ofsreq(srv: ref Styxserver, req: ref Tmsg)
{
	if (req != nil)
	pick m := req {
	Readerror =>
		fprint(stderr, "ofs: read error: %s\n", m.error);
	Attach =>
		# Tattach wont Twalk "/", and stop  validates at Twalk
		(nil, e) := stop->validate(m.tag, big 0, "");
		if (e != nil)
			srv.reply(ref Rmsg.Error(m.tag, e));
		else
			srv.attach(m);
	Open =>
		stats.nopen++;
		umode := m.mode;	# save OTRUNC
		fid := srv.open(m);
		if (fid != nil)
		if ((umode & OTRUNC) != 0){
			d := ref nulldir;
			d.length = big 0;
			stop->wstat(m.tag, fid.path, d);	# this truncates.
		}
	Read =>
		stats.nread++;
		(fid, e) := srv.canread(m);
		if(e != nil){
			srv.reply(ref Rmsg.Error(m.tag, e));
			return;
		}
		if(fid.qtype & QTDIR){
			srv.default(m);
			return;
		}
		(d, re) := stop->pread(m.tag, fid.path, m.count, m.offset);
		if (re != nil)
			srv.reply(ref Rmsg.Error(m.tag, re));
		else
			srv.reply(ref Rmsg.Read(m.tag, d));		
	Write =>
		stats.nwrite++;
		(fid, e) := srv.canwrite(m);
		if(e != nil){
			srv.reply(ref Rmsg.Error(m.tag, e));
			return;
		}
		we : string;
		cnt : int;
		if (len m.data > 0)
			(cnt, we) = stop->pwrite(m.tag, fid.path, m.data, m.offset);
		if (we != nil)
			srv.reply(ref Rmsg.Error(m.tag, we));
		else
			srv.reply(ref Rmsg.Write(m.tag, cnt));
	Flush =>
		# BUG: we accept concurrent requests.
		# a flush should cancel them, as said in flush(5).
		stop->flush(m.tag, m.oldtag);
		srv.reply(ref Rmsg.Flush(m.tag));
	Clunk =>
		stats.nclunk++;
		fid := srv.getfid(m.fid);
		if(fid == nil) {
			srv.reply(ref Rmsg.Error(m.tag, Ebadfid));
			return;
		}
		srv.delfid(fid);
		e := stop->sync(m.tag, fid.path);
		if (e != nil)
			srv.reply(ref Rmsg.Error(m.tag, e));
		else
			srv.reply(ref Rmsg.Clunk(m.tag));
	Create =>
		stats.ncreate++;
		(fid, mode, d, e) := srv.cancreate(m);
		if (e != nil){
			srv.reply(ref Rmsg.Error(m.tag, e));
			return;
		}
		(d, e) = stop->create(m.tag, fid.path, d);
		if (e != nil){
			srv.reply(ref Rmsg.Error(m.tag, e));
			return;
		}
		fid.open(mode, d.qid);
		srv.reply(ref Rmsg.Create(m.tag, d.qid, 8*1024));
	Remove =>
		stats.nremove++;
		(fid, nil, e) := srv.canremove(m);
		if(e != nil) {
			srv.reply(ref Rmsg.Error(m.tag, e));
			return;
		}
		e = stop->remove(m.tag, fid.path);
		if (e != nil){
			srv.reply(ref Rmsg.Error(m.tag, e));
			return;
		}
		srv.delfid(fid);
		srv.reply(ref Rmsg.Remove(m.tag));
	Wstat =>
		stats.nwstat++;
		fid := srv.getfid(m.fid);
		if(fid == nil) {
			srv.reply(ref Rmsg.Error(m.tag, Ebadfid));
			return;
		}
		(nil, e) := stop->wstat(m.tag, fid.path, ref m.stat);
		if (e != nil)
			srv.reply(ref Rmsg.Error(m.tag, e));
		else
			srv.reply(ref Rmsg.Wstat(m.tag));
	*  =>
		srv.default(m);
	}
}

terminate(srv: ref Styxserver, navc: chan of ref Navop, optoo: int)
{
	if (dostats){
		stats.dump();
		opmux->dump();
	}
	if (notify != nil)
		notify->gone();
	navc <-= nil;
	stop->term();
	if (optoo)
		opmux->term();
	srv.replychan <-= nil;
	kill(pctl(0,nil), "killgrp");	# kill tmsgreader and any other
	exit;
}

fsrequestproc(srv: ref Styxserver, reqc: chan of ref Tmsg, navc: chan of ref Navop, endc: chan of string)
{
	if (debug)
		fprint(stderr, "echo killgrp >/prog/%d/ctl\n", pctl(0,nil));
	if (notify != nil)
		notify->arrived();
	for(;;){
		# BUG: should cache fs and nav processes.
		alt {
		<- endc =>
			terminate(srv, navc, 0);
		r := <-reqc =>
			if (r == nil)
				terminate(srv, navc, 1);
			ofsreq(srv, r);
		}
	}
}

navrequestproc(navc: chan of ref Navop)
{
	for(;;){
		n := <-navc;
		if (n == nil)
			exit;
		navreq(n);
	}
}

replyproc(srv: ref Styxserver)
{
	for(;;){
		r := <- srv.replychan;
		if (r == nil)
			break;
		nw := srv.replydirect(r);
		if (nw < 0){
			opmux->term();
			stop->term();
			break;
		}
	}
}

attach(path: string) : string
{
	rc := opmux->rpc(ref (Op->Tmsg).Attach(0, "nemo", path));
	r := <- rc;
	pick rr := r {
	Attach =>
		return nil;
	Error =>
		return rr.ename;
	* =>
		return "can't attach";
	}
}

dorecover := 0;
recoveraddr: string;
recoveralg: string;
recoverkfile: string;
recoverpath: string;

recover(): ref FD
{
	addr := recoveraddr;
	alg := recoveralg;
	kfile := recoverkfile;
	opfd, opcfd: ref FD;
	while(dorecover){
		if (addr[0] == '/')
			opfd = open(addr, ORDWR);
		if (opfd == nil){
			addr = netmkaddr(addr, "tcp", "op");
			(rc, c) := dial(addr, nil);
			if (rc < 0)
				continue;
			opfd = c.dfd;
			opcfd = c.cfd;
			c.dfd = c.cfd = nil;
		}
		if (doauth){
			(afd, ae) := authfd(opfd, Client, alg, kfile, addr);
			opfd = afd; afd = nil;
			if (debug && ae != nil)
				fprint(stderr, "ofs: authenticated: %s\n", ae);
			if (opfd == nil)
				continue;
			user = ae;
		}
		e := attach(recoverpath);
		if (e == nil)
			return opfd;
	}
	return nil;
}

service(pidc: chan of int, sfd, cfd: ref FD, cdir: string, path: string, lag: int)
{
	pidc <-= sys->pctl(Sys->FORKNS|Sys->NEWPGRP|Sys->NEWFD, list of {0,1,2,sfd.fd,cfd.fd});
	stats = ref Stats(0, 0, 0, 0, 0, 0, 0, 0, 0);
	op->init();
	endc := chan[1] of string;
	opmux->init(cfd, op, endc);
	e := attach(path);
	if (e != nil){
		sfd = cfd = nil;
		error("ofs: can't attach");
	} else if (debug)
		fprint(stderr, "attached as %s\n", user);
	styx->init();
	styxs->init(styx);
	navc := chan of ref Navop;
	nav := Navigator.new(navc);
	(reqc, srv) := Styxserver.new(sfd, nav, big 0);
	srv.replychan = chan[10] of ref Styx->Rmsg;
	spawn replyproc(srv);
	if (stop->init(styx, opmux, cdir, lag) != nil){
		fprint(stderr, "should not happen: cache init failed\n");
		raise("fail:stop");
	}
	spawn navrequestproc(navc);
	spawn fsrequestproc(srv, reqc, navc, endc);
	if (debug)
		fprint(stderr, "ps | grep ' %d ' \n", pctl(0,nil));
}

oimport(fd: ref FD): string
{
	data := array[10] of byte;

	nr := read(fd, data, 9);
	if (nr <= 0){
		if (debug)fprint(stderr, "ofs: import: %r\n");
		return nil;
	}
	dr := int (string data);
	data = array[dr] of byte;
	nr = read(fd, data, dr);
	if (nr != dr){
		if (debug)fprint(stderr, "ofs: import: short read\n");
		return nil;
	}
	return string data;
}

init(nil: ref Draw->Context, args: list of string)
{
	sys = load Sys Sys->PATH;
	err = load Error Error->PATH;
	err->init();
	styx = checkload(load Styx Styx->PATH, Styx->PATH);
	styxs = checkload(load Styxservers "/dis/o/ofsstyx.dis", "/dis/o/ofsstyx.dis");
	env = checkload(load Env Env->PATH, Env->PATH);
	str = checkload(load String String->PATH, String->PATH);
	op = checkload(load Op Op->PATH, Op->PATH);
	opmux = checkload(load Opmux Opmux->PATH, Opmux->PATH);
	stop = checkload(load Stop Stop->PATH, Stop->PATH);
	util = checkload(load Netutil Netutil->PATH, Netutil->PATH);
	arg = checkload(load Arg Arg->PATH, Arg->PATH);

	mnt : string;
	cdir, alg, kfile : string;
	path := "/";
	arg->init(args);
	lag := 1000;
	arg->setusage("ofs [-Adr] [-C alg] [-k keyfile] [-c dir] [-m mnt] [-l n] addr [path]");
	while((opt := arg->opt()) != 0) {
		case opt{
		'A' =>
			doauth = 0;
		'C' =>
			alg = arg->earg();
		'k' =>
			kfile = arg->earg();
		'c' =>
			cdir = arg->earg();
		'd' =>
			debug++;
			if (debug > 1)
				opmux->debug = 1;
			if (debug > 2)
				stop->debug = 1;
			styxs->traceset(1);
		'v' =>
			dostats = 1;
		'l'	=>
			lag = int arg->earg();
		'm' =>
			mnt = arg->earg();
		'r' =>
			dorecover = 1;
		* =>
			usage();
		}
	}
	addr : string;
	args = arg->argv();
	case len args {
	2 =>
		addr = hd args;
		path = hd tl args;
	1 =>
		addr = hd args;
	* =>
		usage();
	}

	opfd, opcfd : ref FD;
	if (addr[0] == '/')
		opfd = open(addr, ORDWR);
	if (opfd == nil){
		addr = netmkaddr(addr, "tcp", "op");
		(rc, c) := dial(addr, nil);
		if (rc < 0)
			error(sprint("%s: %r\n", addr));
		opfd = c.dfd;
		opcfd = c.cfd;
		c.dfd = c.cfd = nil;
	}
	if (doauth){
		(afd, ae) := authfd(opfd, Client, alg, kfile, addr);
		opfd = afd; afd = nil;
		if (debug && ae != nil)
			fprint(stderr, "ofs: authenticated: %s\n", ae);
		if (opfd == nil)
			error(sprint("ofs: fail: %s: %r\n", addr));
		user = ae;
	}
	pidc := chan[1] of int;
	if (mnt == nil)
		service(pidc, fildes(0), opfd, cdir, path, lag);
	else {
		if (mnt == "auto"){
			sname := oimport(opfd);
			if (sname == nil)
				error("ofs: fail: import\n");
			mnt = "/devs/" + sname;
			fprint(stderr, "ofs: importing %s\n", sname);
			create(mnt, OREAD, DMDIR|8r0555);	# in case it does not exist
			notify = load Ofsnotify Ofsnotify->PATH;
			if (notify != nil)
				notify->init(sname);
		} else if (dorecover){
			recoveraddr = addr;
			recoveralg = alg;
			recoverkfile= kfile;
			recoverpath = path;
			opmux->recoverfn = recover;
		}
		pfds := array[2] of ref FD;
		if (pipe(pfds) < 0)
			error(sprint("ofs: pipe: %r"));
		spawn service(pidc, pfds[0], opfd, cdir, path, lag);
		<- pidc;
		pfds[0] = nil;
		opfd = nil;
		if (mount(pfds[1], nil, mnt, MREPL|MCREATE, nil) < 0)
			error(sprint("ofs: mount: %r"));
		pfds[0] = nil;
	}
}


Bell Labs OSI certified Powered by Plan 9

(Return to Plan 9 Home Page)

Copyright © 2021 Plan 9 Foundation. All Rights Reserved.
Comments to [email protected].