Plan 9 from Bell Labs’s /usr/web/sources/contrib/nemo/octopus/port/copyserver.b

Copyright © 2021 Plan 9 Foundation.
Distributed under the MIT License.
Download the Plan 9 distribution.


implement CopyServer;

include "sys.m";
include "draw.m";
include "string.m"; 
include "styx.m";
include "styxservers.m";
include "sh.m";

CopyServer: module
{ 
  init: fn(ctxt: ref Draw->Context, argv: list of string);
};

sys: Sys;
str: String;
styx: Styx;
styxservers: Styxservers;
nametree: Nametree;
sh: Sh;

Styxserver, Navigator: import styxservers;
Tree: import nametree;
Tmsg, Rmsg: import styx;

SRCMNT: con "cpsrv_src";	# dirname prefix for temp source mount
DSTMNT: con "cpsrv_dst";	# dirname prefix for temp dest mount

RUNNING, EOF, DONE, 
RWERR, KILL, ABORT: con iota;	# state codes

CopyOpDesc: adt {
	# request arguments
	srcmntopt: string;	# source mount option
	srcaddr: string;	# source dial address
	srcfname: string;	# source file pathname
	srcoff: big;		# source read offset
	dstmntopt: string;	# dest mount option
	dstaddr: string;	# dest dial address
	dstfname: string; 	# dest file pathname
	dstoff: big;		# dest write offset
	nofbytes: big;		# nof bytes to copy from source to dest
	iounit: int;		# data buffer size to use for copying
	delay: int;		# artificial delay between r/w ops
	ctlfname: string;	# name of control file for this copy op
	# additional state
	state: int;		# current state of copy op
	bytecnt: big;		# nof bytes copied so far
	srcmntdir: string; 	# source mount dir
	srcfd: ref sys->FD;	# source file descriptor for reading
	dstmntdir: string;	# dest mount dir
	dstfd: ref sys->FD;	# dest file descriptor for writing
	qid: big;		# qid of the copy op control file in the server tree
	mtag: int;		# tag of the write request that started the copy op 
	reply: ref Rmsg;	# reply message to this request, for normal completion	
};

# BUG:
# the r/w synchronization for the bytescopied field 
# between the copy thread and the main server thread is dirty
# see BUG comments in routines handleTmsg and copyThread

cqid := big 0;			# seed for generating unique qids for the server file tree
Qroot, Qctrl: big;		# root and control file qids
nfy: chan of ref CopyOpDesc;	# notification copy threads -> main server thread
cpopsl: list of ref CopyOpDesc; # list of ongoing copy ops

debug: int;			# print msgs sent/received and other state info

# aux routines

dprint(msg: string)
{
	if (debug)
		sys->fprint(sys->fildes(2), "%s\n", msg);
}

buildDir(name: string, perm: int, qid: big): Sys->Dir
{
	d := sys->zerodir;
	d.name = name; 
	d.qid.path = qid;
	d.uid = "*"; 
	d.gid = "*";
	if (perm & Sys->DMDIR)
		d.qid.qtype = Sys->QTDIR;
	else 
		d.qid.qtype = Sys->QTFILE; 
	d.mode = perm;
	return(d);
}


# copy op management stuff; simple and non-optimized

printCopyOps()
{
	s := "ongoing copy ops: ";
	for (cl := cpopsl; cl != nil; cl = tl cl) 
		s = s + sys->sprint("%s ",(hd cl).ctlfname);
	s = s + "~";
	dprint(s);
}

initCopyOps()
{
	cpopsl = nil;
}

addCopyOp(cpop: ref CopyOpDesc)
{
	cpopsl = cpop :: cpopsl;
	printCopyOps();
}

rmvCopyOp(cpop: ref CopyOpDesc)
{
	cpopslnew: list of ref CopyOpDesc;

	cpopslnew = nil;
	for (cl := cpopsl; cl != nil; cl = tl cl)
		if (hd cl != cpop)
			cpopslnew = hd cl :: cpopslnew;
	cpopsl = cpopslnew;
	printCopyOps();
}

fndCopyOpByName(ctlfname: string): int
{
	for (cl := cpopsl; cl != nil; cl = tl cl)
		if ((hd cl).ctlfname == ctlfname)
			return(1);
	return(0);
}

fndCopyOpByQid(qid: big): ref CopyOpDesc
{
	for (cl := cpopsl; cl != nil; cl = tl cl)
		if ((hd cl).qid == qid)
			return(hd cl);
	return(nil); 
}

fndCopyOpByTag(mtag: int): ref CopyOpDesc
{
	for (cl := cpopsl; cl != nil; cl = tl cl)
		if ((hd cl).mtag == mtag)
			return(hd cl);
	return(nil); 
}


# cmd execution via shell

runcmd(ctxt: ref Draw->Context, cmdline: string): string
{
	sh = load Sh Sh->PATH;
	if (sh == nil)
		return(sys->sprint("could not load Sh module: %r"));

	(n, args) := sys->tokenize(cmdline, " \t\n");
	if (n == 0)
		return sys->sprint("empty command line string\n");

	c := chan of int;
	fd := sys->open("/prog/"+string sys->pctl(0,nil)+"/wait", Sys->OREAD);
	fds := array[2] of ref Sys->FD;
	sys->pipe(fds);

	spawn doruncmd(ctxt, args, fds[1], c);
	pid := <- c;

	pidstr := sys->sprint("%d ",pid);
	waitstr: string;
	buf := array [256] of byte;

	do {
		n = sys->read(fd, buf, len buf);
		waitstr = string buf[0:n];
	} while (!str->prefix(pidstr, waitstr));

	(res, d) := sys->fstat(fds[0]);
	if (d.length == big 0) { return(nil); } 

	n = sys->read(fds[0], buf, len buf);
	return(string buf[0:n]);
}

doruncmd(ctxt: ref Draw->Context, args: list of string, errfd: ref sys->FD, c: chan of int)
{
	pid := sys->pctl(sys->FORKFD, nil);
	sys->dup(errfd.fd, 2);
	c <-= pid;
	sh->run(ctxt, args);
}


# mount and unmount stuff; for convenience implemented using shell commands

unmount(opt, mnt: string)
{
	if (opt != nil) {
		runcmd(nil, "unmount "+mnt);
		runcmd(nil, "rm "+mnt);
	}
}


mount_open(mntopt, addr, mntdir, fname: string, off: big, mode : int): (string, ref sys->FD)
{
	fpath := fname;

	if (fpath[0] != '/')
		fpath = "/" + fpath;

	if (mntopt != nil) {

		err := runcmd(nil, "mkdir " + mntdir);
		if (err != nil)
			return((err, nil));

		authopt := "";
		if (str->in('A', mntopt))
			authopt = "-A";

		if (str->in('o', mntopt))
			err = runcmd(nil, "o/ofs" + " -m " + mntdir + " " + authopt + " " + addr + " /");
		else if (str->in('s', mntopt))
			err = runcmd(nil, "mount" + " " + authopt + " " + addr + " " + mntdir);
		else {
			sys->fprint(sys->fildes(2), "copy server fatal: illegal option: %s", mntopt);
			raise "fatal:mntopt";
		}

		if (err != nil) {
			runcmd(nil, "rm " + mntdir);
			return((err, nil));
		}
		
		fpath = mntdir + fpath;

	}

	fd: ref sys->FD;
	if (mode == sys->OREAD)
		fd = sys->open(fpath, mode);
	else if (mode == sys->OWRITE)
		fd = sys->create(fpath, mode, 8r777);
		#BUG:
		# ownership and access rights when creating a file?
	
	if (fd == nil) {
		err := sys->sprint("could not open %s: %r", fname);
		unmount(mntopt, mntdir);
		return ((err, nil));
	}

	sys->seek(fd, off, Sys->SEEKSTART);

	return((nil, fd));
}


# cmd parsing stuff

isValidMntOption(opt: string): int
{
	return ((opt == "-s") || (opt == "-sA") | 
	        (opt == "-o") || (opt == "-oA"));
}

isValidInt(num: string): (int, int)
{
	(i, s) := str->toint(num, 10);
	return(((s == nil) && (i >= 0), i));
} 

isValidBig(num: string): (int, big)
{
	(b, s) := str->tobig(num, 10);
	return(((s == nil) && (b >= big 0), b));
} 

parseFileSpec(cmdlist: list of string): (string, string, string, string, big, list of string)
{
	if (cmdlist == nil)
		return (("no file name", nil, nil, nil, big 0, nil));
	fname := hd cmdlist; cmdlist = tl cmdlist;
	opt := string nil;
	addr := string nil;
 
	if (fname[0] == '-') {
		opt = fname;
		if (!isValidMntOption(opt))
			return (("invalid mount option", nil, nil, nil, big 0, nil));
		
		if (cmdlist == nil)
			return (("no dial address", nil, nil, nil, big 0, nil));
		addr = hd cmdlist; cmdlist = tl cmdlist;
		
		if (cmdlist == nil)
			return (("no file name", nil, nil, nil, big 0, nil));
		fname = hd cmdlist; cmdlist = tl cmdlist;
	}	
	
	if (cmdlist == nil)
		return (("no offset", nil, nil, nil, big 0, nil));

	(ok, off) := isValidBig(hd cmdlist); cmdlist = tl cmdlist;
	if (!ok)
		return (("invalid offset", nil, nil, nil, big 0, nil));

	return((nil, opt, addr, fname, off, cmdlist)); 
}

parseCmd(cmdline: string): (string, ref CopyOpDesc)
{ 
	dprint(sys->sprint("request: %s", cmdline));

	(n, cmdlist) := sys->tokenize(cmdline, " \n"); 
	
	if (cmdlist == nil)
		return (("empty cmd string", nil));
	
	cmd := hd cmdlist; cmdlist = tl cmdlist;
	if (cmd != "copy")
		return (("copy expected", nil));

	cpop := ref CopyOpDesc(nil, nil, nil, big 0, nil, nil, nil, big 0, big 0, 0, 0, nil,
	                       0, big 0, nil, nil, nil, nil, big 0, 0, nil);

	err: string;

	(err, cpop.srcmntopt, cpop.srcaddr, cpop.srcfname, cpop.srcoff, cmdlist) = parseFileSpec(cmdlist);
	if (err != nil)
		return (("source: " + err, nil));

	(err, cpop.dstmntopt, cpop.dstaddr, cpop.dstfname, cpop.dstoff, cmdlist) = parseFileSpec(cmdlist);
	if (err != nil)
		return (("dest: " + err, nil));
	
	if (len cmdlist != 4)
		return (("nof bytes, iounit, delay and ctlfname expected", nil));

	ok: int;

	(ok, cpop.nofbytes) = isValidBig(hd cmdlist); cmdlist = tl cmdlist;
	if (!ok)
		return (("invalid nofbytes", nil));

	(ok, cpop.iounit) = isValidInt(hd cmdlist); cmdlist = tl cmdlist;
	if (!ok)
		return (("invalid iounit", nil));

	(ok, cpop.delay) = isValidInt(hd cmdlist); cmdlist = tl cmdlist;
	if (!ok)
		return (("invalid delay", nil));

	cpop.ctlfname = hd cmdlist; cmdlist = tl cmdlist;

	return ((nil, cpop)); 
}


# copy thread stuff

startCopyThread(filetree: ref Tree, msg: ref Tmsg.Write): string
{
	(err, cpop) := parseCmd(string msg.data);
	
	if (err != nil) 
		return(err); 

	if ((cpop.ctlfname == "ctl") || fndCopyOpByName(cpop.ctlfname))
		return("ctl file name already in use");

	cpop.srcmntdir = SRCMNT + "_" + cpop.ctlfname;
	(err, cpop.srcfd) = mount_open(cpop.srcmntopt, cpop.srcaddr, cpop.srcmntdir, 
	                               cpop.srcfname, cpop.srcoff, sys->OREAD);
	if (err != nil)
		return(err);

	cpop.dstmntdir = DSTMNT + "_" + cpop.ctlfname;
	(err, cpop.dstfd) = mount_open(cpop.dstmntopt, cpop.dstaddr, cpop.dstmntdir, 
	                               cpop.dstfname, cpop.dstoff, sys->OWRITE);
	if (err != nil) {
		unmount(cpop.srcmntopt, cpop.srcmntdir); 
		return(err);
	}
	
	cpop.state = RUNNING;
	cpop.bytecnt = big 0;
	cpop.qid = cqid++;
	cpop.mtag = msg.tag;
	cpop.reply = ref Rmsg.Write(msg.tag, len msg.data);

	filetree.create(Qroot, buildDir(cpop.ctlfname, 8r666, cpop.qid));
	#BUG:
	# ownership and access rights when creating a control file?
	addCopyOp(cpop);

	spawn copyThread(cpop);

	return(nil);
}

copyThread(cpop: ref CopyOpDesc)
{
	cnt := big 0; 
	data := array [cpop.iounit] of byte;
	
	dprint(sys->sprint("copyThread started: %s", cpop.ctlfname));
  
	while (cpop.state == RUNNING) {
		rcnt := len data;
		if ((cpop.nofbytes > big 0) && (big rcnt > cpop.nofbytes - cnt)) 
				rcnt = int (cpop.nofbytes - cnt); 
		
		n1 := sys->read(cpop.srcfd, data, rcnt);
		if (n1 < 0) {
			cpop.reply = ref Rmsg.Error(cpop.mtag, sys->sprint("read error: %r"));
			cpop.state = RWERR;
			break;
		}
		else if (n1 == 0) {
			cpop.state = EOF;
			break;
		}
		
		n2:= sys->write(cpop.dstfd, data, n1);
		if (n2 != n1) {
			cpop.reply = ref Rmsg.Error(cpop.mtag, sys->sprint("write error: %r")); 
			cpop.state = RWERR;
			break;
		}
		
		cnt = cnt + big n1;
		cpop.bytecnt = cnt;

		# BUG: 
		# we change the value of bytescopied in "a single shot"
		# however, this does not guarantee read atomicity
		# so the main server thread may read invalid data
		# see BUG comment in function handleTMsg

		if (cnt == cpop.nofbytes) {
			cpop.state = DONE;
			break;
		}
		 
		sys->sleep(cpop.delay);
	}
	
	if (cpop.state == KILL)
		cpop.reply = ref Rmsg.Error(cpop.mtag, sys->sprint("killed after %bd bytes", cnt));
	nfy <- = cpop; # termination signal to main server thread

	dprint(sys->sprint("copyThread stopped: %s", cpop.ctlfname));
}

gcCopyOp(srv: ref Styxserver, filetree: ref Tree, cpop: ref CopyOpDesc)
{
	termcodes := array [ABORT+1] of {"RUNNING", "EOF", "DONE", "RWERRROR", "KILLED", "ABORTED"}; 
	dprint(sys->sprint("term signal from %s: %s", cpop.ctlfname, termcodes[cpop.state]));
	
	unmount(cpop.srcmntopt, cpop.srcmntdir); 
	unmount(cpop.dstmntopt, cpop.dstmntdir);
	
	filetree.remove(cpop.qid);
	rmvCopyOp(cpop);

	# send reply msg to unblock client
	if (cpop.state != ABORT)
		srv.reply(cpop.reply);
} 

killCopyThread(cpop: ref CopyOpDesc, cmd: int)
{
	cpop.state = cmd;  # force copy thread to terminate, if not done yet
} 


killAllCopyThreads(srv: ref Styxserver, filetree: ref Tree, cmd: int)
{
	for (cl := cpopsl; cl != nil; cl = tl cl)
		killCopyThread(hd cl, cmd);
		
	while (cpopsl != nil) {
		cpop := <- nfy;
		gcCopyOp(srv, filetree, cpop);
	}
		
}


# main server thread stuff

handleTmsg(srv: ref Styxserver, filetree: ref Tree, msg: ref Tmsg)
{
	if (msg == nil) {
		killAllCopyThreads(srv, filetree, ABORT);
		dprint("stopping copy server");
		srv.default(msg);   # kills this process
		sys->fprint(sys->fildes(2), "copy server fatal: should be dead at this point");
		raise "fatal:noterm";
	}
	pick m := msg {
		Open => {
			fid := srv.open(m);
			if (fid != nil) {
				if ((fid.path != Qctrl) && (fid.path != Qroot) && 
				   ((cpop:=fndCopyOpByQid(fid.path)) != nil))
					# cache for subsequent reads on this fid
					fid.data = array of byte sys->sprint("%bd",cpop.bytecnt);
					# BUG:
					# reading bytescopied is not necessarily atomic
					# see BUG comment in function copyThread
			}
		}
		Write => {
			(fid, err) := srv.canwrite(m);
			if (err != nil) 
				srv.reply(ref Rmsg.Error(m.tag, err)); 
			else if (fid.path == Qctrl) {
				err = startCopyThread(filetree, m); 
				if (err != nil)
					srv.reply(ref Rmsg.Error(m.tag, err));
				# if all went well, we do not reply to block the client
			}
			else {
				(n,cmd) := sys->tokenize(string m.data, " \n");
              			if ((n != 1) || (hd cmd != "kill"))
					srv.reply(ref Rmsg.Error(m.tag,"usage: kill")); 
				else {
					if ((cpop:=fndCopyOpByQid(fid.path)) != nil)
						killCopyThread(cpop, KILL);
					srv.reply(ref Rmsg.Write(m.tag, len m.data));
					# killing a terminated copy op succeeds as well
				} 
			}
		}
		Read => {
			(fid, err) := srv.canread(m);
			if (err != nil)
				srv.reply(ref Rmsg.Error(m.tag, err)); 
			else if ((fid.path == Qctrl) || (fid.path == Qroot)) 
				srv.default(msg);
			else 
				srv.reply(styxservers->readbytes(m, fid.data));
		}
		Flush => {
			srv.default(msg);
			if ((cpop:=fndCopyOpByTag(m.oldtag)) != nil)
				killCopyThread(cpop, ABORT);
		}
		* => { srv.default(msg); }
	};
}

serve(fd: ref sys->FD, c: chan of int)
{
	pid := sys->pctl(Sys->FORKNS|Sys->NEWFD|Sys->NEWPGRP, list of {1,2,fd.fd});
	
	styx->init();
	styxservers->init(styx);
	nametree->init();

	styxservers->traceset(debug);

	Qroot = cqid++;
	Qctrl  = cqid++;

	(filetree, filetreeop) := nametree->start();
	filetree.create(Qroot, buildDir(".", 8r555|Sys->DMDIR, Qroot));
	filetree.create(Qroot, buildDir("ctl", 8r222, Qctrl));

	(tchan, srv) := Styxserver.new(fd, Navigator.new(filetreeop), Qroot);
  
	initCopyOps(); 
	
	nfy = chan of ref CopyOpDesc;

	dprint("copy server started");

	c <- = pid;

	while (1) {
		alt {
			tmsg := <- tchan =>
				handleTmsg(srv, filetree, tmsg); 			
			cpop := <- nfy =>
				gcCopyOp(srv, filetree, cpop);
		}
	}

}


init(nil: ref Draw->Context, args: list of string)
{
	dirname: string; ok: int;
	
	sys = load Sys Sys->PATH;
	str = load String String->PATH;

	if ((len args < 2) || (len args > 3)) {
		sys->print("usage: copyserver [-d] mnt\n"); 
		exit;
	} 
	
	if (len args == 2) {
		debug = 0;
		dirname = hd tl args;
	}
	else if (hd tl args == "-d") { 
		debug = 1; 
		dirname = hd tl tl args;
	}
	else {
		sys->print("usage: copyserver [-d] mnt\n"); 
		exit; 
	}
	
	styx = load Styx Styx->PATH;
	styxservers = load Styxservers Styxservers->PATH;
	nametree = load Nametree Nametree->PATH;

	fds := array[2] of ref Sys->FD;
	sys->pipe(fds);
	c := chan of int;

	spawn serve(fds[0], c);
	<- c;
	fds[0] = nil;

	res := sys->mount(fds[1], nil, dirname, sys->MREPL, nil);
	if (res == -1) {
		sys->print("could not mount: %r\n"); 
		exit;
	} 
	
	sys->print("server mounted on %s\n", dirname);  
	fds[1] = nil;
}

Bell Labs OSI certified Powered by Plan 9

(Return to Plan 9 Home Page)

Copyright © 2021 Plan 9 Foundation. All Rights Reserved.
Comments to [email protected].