implement Pgcfs;
# The Dbfs code I started with as a template said :
# Copyright © 1999 Vita Nuova Limited. All rights reserved.
# Revisions copyright © 2002 Vita Nuova Holdings Limited. All rights reserved.
# Ok well, I raise you Postgresql catalogue tables as the datastore [email protected] is this GPL now ? Tis a bit confusing. I'm sure you don't mind.
#pgcfs && cat /mnt/db/1 mk && ./pgcfs && ls -l /mnt/db/proc/ | grep www
# mk && ./pgcfs && ls /mnt/db/user/www/proc
debug : con 0;
include "sys.m";
sys: Sys;
Qid: import Sys;
include "draw.m";
include "arg.m";
include "styx.m";
styx: Styx;
Tmsg, Rmsg: import styx;
include "styxservers.m";
styxservers: Styxservers;
Fid, Styxserver, Navigator, Navop: import styxservers;
Enotfound, Eperm, Ebadarg: import styxservers;
include "bufio.m";
bufio: Bufio;
Iobuf: import bufio;
include "keyring.m";
keyring: Keyring;
include "pgbase.m";
pgbase :PgBase;
Connection : import pgbase;
include "pg_catalogue.m";
pgcatalogue : PgCatalogue;
Catalogue : import pgcatalogue;
# XXX
Record: adt {
id: int; # file number in directory
x: int; # index in file
dirty: int; # modified but not written
vers: int; # version
data: array of byte;
new: fn(x: array of byte): ref Record;
print: fn(r: self ref Record, fd: ref Sys->FD);
qid: fn(r: self ref Record): Sys->Qid;
};
# XXX
Database: adt {
name: string;
file: ref Iobuf;
records: array of ref Record;
dirty: int;
vers: int;
nextid: int;
findrec: fn(db: self ref Database, id: int): ref Record;
};
Pgcfs: module
{
init: fn(nil: ref Draw->Context, nil: list of string);
};
Qdir, Qnew, Qdata, Qproc, Qprocdir, Quserdir, Quser, Qsysid, Quserprocdir, Quserproc: con iota;
clockfd: ref Sys->FD;
stderr: ref Sys->FD;
database: ref Database;
err_chan : chan of string;
user: string;
Eremoved: con "file removed";
pg_cat : ref Catalogue;
usage()
{
sys->fprint(stderr, "Usage: dbfs [-a|-b|-ac|-bc] [-D] file mountpoint\n");
raise "fail:usage";
}
nomod(s: string)
{
sys->fprint(stderr, "dbfs: can't load %s: %r\n", s);
raise "fail:load";
}
open_file(file : string, empty : int) : ref Iobuf
{
df := bufio->open(file, Sys->OREAD);
if(df == nil && empty){
(rc, nil) := sys->stat(file);
if(rc < 0)
df = bufio->create(file, Sys->OREAD, 8r600);
}
if(df == nil){
sys->fprint(stderr, "dbfs: can't open %s: %r\n", file);
raise "fail:open";
}
return df;
}
create_pipes() : array of ref Sys->FD
{
fds := array[2] of ref Sys->FD;
if(sys->pipe(fds) < 0){
sys->fprint(stderr, "dbfs: can't create pipe: %r\n");
raise "fail:pipe";
}
return fds;
}
init(nil: ref Draw->Context, args: list of string)
{
sys = load Sys Sys->PATH;
sys->pctl(Sys->FORKFD|Sys->NEWPGRP, nil);
stderr = sys->fildes(2);
styx = load Styx Styx->PATH;
if(styx == nil)
nomod(Styx->PATH);
styx->init();
styxservers = load Styxservers Styxservers->PATH;
if(styxservers == nil)
nomod(Styxservers->PATH);
styxservers->init(styx);
bufio = load Bufio Bufio->PATH;
if(bufio == nil)
nomod(Bufio->PATH);
arg := load Arg Arg->PATH;
if(arg == nil)
nomod(Arg->PATH);
arg->init(args);
err_chan = chan of string;
spawn logger();
pgcatalogue = load PgCatalogue "pg_catalogue.dis";
conn := pgcatalogue->new_connection("127.0.0.1", "5432", "www", "", "study", nil, nil);
pg_cat = pgcatalogue->new_catalogue(conn);
if(pg_cat == nil)
raise "catalogue creation failed";
pg_cat.err_chan = err_chan;
pg_cat.sync();
flags := Sys->MREPL;
flags |= Sys->MCREATE;
mountpt := "/mnt/db";
sys->pctl(Sys->FORKFD, nil);
user = rf("/dev/user");
if(user == nil)
user = "inferno";
fds := create_pipes();
navops := chan of ref Navop;
spawn navigator(navops);
(tchan, srv) := Styxserver.new(fds[0], Navigator.new(navops), big Qdir);
fds[0] = nil;
pidc := chan of int;
spawn serveloop(tchan, srv, pidc, navops);
<-pidc;
if(sys->mount(fds[1], nil, mountpt, flags, nil) < 0) {
sys->fprint(stderr, "dbfs: mount failed: %r\n");
raise "fail:mount";
}
}
logger()
{
txt : string;
while((txt = <- err_chan) != nil) {
if(debug)
sys->print("%s\n", txt);
}
if(debug)
sys->print("Closing\n");
}
rf(f: string): string
{
fd := sys->open(f, Sys->OREAD);
if(fd == nil)
return nil;
b := array[Sys->NAMEMAX] of byte;
n := sys->read(fd, b, len b);
if(n < 0)
return nil;
return string b[0:n];
}
#XXX
dbread(db: ref Database): (ref Database, string)
{
db.file.seek(big 0, Sys->SEEKSTART);
rl: list of ref Record;
n := 0;
for(;;){
(r, err) := getrec(db);
if(err != nil)
return (nil, err); # could press on without it, or make it the `file' contents
if(r == nil)
break;
rl = r :: rl;
n++;
}
db.nextid = n;
db.records = array[n] of ref Record;
for(; rl != nil; rl = tl rl){
r := hd rl;
n--;
r.id = n;
r.x = n;
db.records[n] = r;
}
return (db, nil);
}
#XXX
getrec(db: ref Database): (ref Record, string)
{
r := ref Record(-1, -1, 0, 0, nil);
data := "";
for(;;){
s := db.file.gets('\n');
if(s == nil){
if(data == nil)
return (nil, nil); # BUG: distinguish i/o error from EOF?
break;
}
if(s[len s - 1] != '\n')
# return (nil, "file missing newline"); # possibly truncated
s += "\n";
if(s == "\n")
break;
data += s;
}
r.data = array of byte data;
return (r, nil);
}
#XXX
dbsync(db: ref Database): int
{
if(db.dirty){
db.file = bufio->create(db.name, Sys->OWRITE, 8r666);
if(db.file == nil)
return -1;
for(i := 0; i < len db.records; i++){
r := db.records[i];
if(r != nil && r.data != nil){
if(db.file.write(r.data, len r.data) != len r.data)
return -1;
db.file.putc('\n');
}
}
if(db.file.flush())
return -1;
db.file = nil;
db.dirty = 0;
}
return 0;
}
dbprint(db: ref Database)
{
stdout := sys->fildes(1);
for(i := 0; i < len db.records; i++){
db.records[i].print(stdout);
sys->print("\n");
}
}
Database.findrec(db: self ref Database, id: int): ref Record
{
for(i:=0; i<len db.records; i++)
if((r := db.records[i]) != nil && r.id == id)
return r;
return nil;
}
Record.new(fields: array of byte): ref Record
{
n := len database.records;
r := ref Record(n, n, 0, 0, fields);
a := array[n+1] of ref Record;
if(n)
a[0:] = database.records[0:];
a[n] = r;
database.records = a;
database.vers++;
return r;
}
Record.print(r: self ref Record, fd: ref Sys->FD)
{
if(r.data != nil)
sys->write(fd, r.data, len r.data);
}
Record.qid(r: self ref Record): Sys->Qid
{
return Sys->Qid(QPATH(r.x, Qdata), r.vers, Sys->QTFILE);
}
serveloop(tchan: chan of ref Tmsg, srv: ref Styxserver, pidc: chan of int, navops: chan of ref Navop)
{
pidc <-= sys->pctl(Sys->FORKNS|Sys->NEWFD, 1::2::srv.fd.fd::nil);
Serve:
while((gm := <-tchan) != nil){
pick m := gm {
Readerror =>
sys->fprint(stderr, "dbfs: fatal read error: %s\n", m.error);
break Serve;
Open =>
c := srv.getfid(m.fid);
if(c == nil || TYPE(c.path) != Qnew){
srv.open(m); # default action
break;
}
if(c.uname != user) {
srv.reply(ref Rmsg.Error(m.tag, Eperm));
break;
}
mode := styxservers->openmode(m.mode);
if(mode < 0) {
srv.reply(ref Rmsg.Error(m.tag, Ebadarg));
break;
}
# generate new file, change Fid's qid to match
r := Record.new(array[0] of byte);
qid := r.qid();
c.open(mode, qid);
srv.reply(ref Rmsg.Open(m.tag, qid, srv.iounit()));
Read =>
(c, err) := srv.canread(m);
if(c == nil){
srv.reply(ref Rmsg.Error(m.tag, err));
break;
}
if(c.qtype & Sys->QTDIR){
srv.read(m); # does readdir
break;
}
r := database.records[FILENO(c.path)];
if(r == nil)
srv.reply(ref Rmsg.Error(m.tag, Eremoved));
else
srv.reply(styxservers->readbytes(m, r.data));
Write =>
(c, merr) := srv.canwrite(m);
if(c == nil){
srv.reply(ref Rmsg.Error(m.tag, merr));
break;
}
(value, err) := data2rec(m.data);
if(err != nil){
srv.reply(ref Rmsg.Error(m.tag, err));
break;
}
fno := FILENO(c.path);
r := database.records[fno];
if(r == nil){
srv.reply(ref Rmsg.Error(m.tag, Eremoved));
break;
}
r.data = value;
r.vers++;
database.dirty++;
if(dbsync(database) == 0)
srv.reply(ref Rmsg.Write(m.tag, len m.data));
else
srv.reply(ref Rmsg.Error(m.tag, sys->sprint("%r")));
Clunk =>
# a transaction-oriented dbfs could delay updating the record until clunk
srv.clunk(m);
Remove =>
c := srv.getfid(m.fid);
if(c == nil || c.qtype & Sys->QTDIR || TYPE(c.path) != Qdata){
# let it diagnose all the errors
srv.remove(m);
break;
}
r := database.records[FILENO(c.path)];
if(r != nil)
r.data = nil;
database.dirty++;
srv.delfid(c);
if(dbsync(database) == 0)
srv.reply(ref Rmsg.Remove(m.tag));
else
srv.reply(ref Rmsg.Error(m.tag, sys->sprint("%r")));
Wstat =>
srv.default(gm); # TO DO?
* =>
srv.default(gm);
}
}
navops <-= nil; # shut down navigator
}
dirslot(n: int): int
{
for(i := 0; i < len database.records; i++){
r := database.records[i];
if(r != nil && r.data != nil){
if(n == 0)
return i;
n--;
}
}
return -1;
}
#
# a record is (.+\n)*, without final empty line
#
data2rec(data: array of byte): (array of byte, string)
{
s: string;
for(b := data; len b > 0;){
(b, s) = getline(b);
if(s == nil || s[len s - 1] != '\n' || s == "\n")
return (nil, "partial or malformed record"); # possibly truncated
}
return (data, nil);
}
getline(b: array of byte): (array of byte, string)
{
n := len b;
for(i := 0; i < n; i++){
(ch, l, nil) := sys->byte2char(b, i);
i += l;
if(l == 0 || ch == '\n')
break;
}
return (b[i:], string b[0:i]);
}
serveloopX(tchan: chan of ref Tmsg, srv: ref Styxserver, pidc: chan of int, navops: chan of ref Navop)
{
pidc <-= sys->pctl(Sys->FORKNS|Sys->NEWFD, 1::2::srv.fd.fd::nil);
Serve:
while((gm := <-tchan) != nil){
err_chan <- = "Serve";
pick m := gm {
Readerror =>
err_chan <- = "Readerror";
sys->fprint(stderr, "dbfs: fatal read error: %s\n", m.error);
break Serve;
Open =>
err_chan <- = "Open";
c := srv.getfid(m.fid);
if(c == nil){
srv.open(m); # default action
err_chan <- = "Open";
break;
}
err_chan <- = "Opened";
Read =>
err_chan <- = "Read";
(c, err) := srv.canread(m);
if(c == nil){
srv.reply(ref Rmsg.Error(m.tag, err));
break;
}
if(c.qtype & Sys->QTDIR){
srv.read(m); # does readdir
break;
}
case TYPE(c.path) {
Qproc =>
err_chan <- = "Qproc";
pid := FILENO(c.path);
if(pid < len pg_cat.procs) {
srv.reply(styxservers->readbytes(m, array of byte pg_cat.proc_sql(pid)));
} else {
srv.reply(ref Rmsg.Error(m.tag, Eremoved));
}
* =>
err_chan <- = "*";
srv.reply(nil);
}
Write =>
err_chan <- = "Write";
(c, merr) := srv.canwrite(m);
if(c == nil){
srv.reply(ref Rmsg.Error(m.tag, merr));
break;
}
case TYPE(c.path) {
Qproc =>
err_chan <- = "Qproc";
pid := FILENO(c.path);
if(pid < len pg_cat.procs) {
srv.reply(ref Rmsg.Write(m.tag, len m.data));
} else {
srv.reply(ref Rmsg.Error(m.tag, Eremoved));
}
* =>
err_chan <- = "*";
srv.reply(nil);
}
Clunk =>
err_chan <- = "Clunk";
# a transaction-oriented dbfs could delay updating the record until clunk
srv.clunk(m);
Remove =>
err_chan <- = "Remove";
c := srv.getfid(m.fid);
if(c == nil || c.qtype & Sys->QTDIR){
srv.remove(m);
break;
}
case TYPE(c.path) {
Qproc =>
err_chan <- = "Qproc";
pid := FILENO(c.path);
if(pg_cat.drop_proc(pid)) {
srv.reply(ref Rmsg.Remove(m.tag));
} else {
srv.reply(ref Rmsg.Error(m.tag, "Drop failed"));
}
* =>
err_chan <- = "*";
srv.reply(nil);
}
Wstat =>
err_chan <- = "Wstat";
srv.default(gm); # TO DO?
* =>
err_chan <- = "Default";
srv.default(gm);
}
err_chan <- = "Served";
}
err_chan <- = nil;
navops <-= nil; # shut down navigator
pgcatalogue->disconnect(pg_cat.conn);
}
dir(qid: Sys->Qid, name: string, length: big, uid: string, perm, mtime: int): ref Sys->Dir
{
d := ref sys->zerodir;
d.qid = qid;
if(qid.qtype & Sys->QTDIR)
perm |= Sys->DMDIR;
d.mode = perm;
d.name = name;
d.uid = uid;
d.gid = uid;
d.length = length;
d.mtime = mtime;
return d;
}
proc_dirname(id : int) : string
{
p := pg_cat.procs[id];
dirname := p.name + "+";
for(i := 0; i < p.nargs; i++) {
if(p.argnames != nil && i < len p.argnames)
dirname += p.argnames[i] + "-";
if(p.argtypes != nil && i < len p.argtypes)
dirname += pg_cat.type_name(p.argtypes[i]);
if(i < p.nargs - 1)
dirname += ",";
}
return dirname;
}
dirgen(p: big): (ref Sys->Dir, string)
{
case TYPE(p) {
Qdir =>
return (dir(Qid(QPATH(0, Qdir), 0, Sys->QTDIR), "/", big 0, pg_cat.users[0].name, 8r555, pg_cat.last_sync), nil);
Qprocdir =>
return (dir(Qid(QPATH(0, Qprocdir), 0, Sys->QTDIR), "proc", big 0, pg_cat.users[0].name, 8r555, pg_cat.last_sync), nil);
Qproc =>
id := FILENO(p);
src := pg_cat.proc_sql(id);
return (dir(Qid(QPATH(id, Qproc), 0, Sys->QTFILE), proc_dirname(id), big len src, pg_cat.user_name(pg_cat.procs[id].owner), 8r444, pg_cat.last_sync), nil);
Quserprocdir =>
sysid := FILENO(p);
return (dir(Qid(QPATH(sysid, Quserprocdir), 0, Sys->QTDIR), "proc", big 0, pg_cat.user_name(sysid), 8r555, pg_cat.last_sync), nil);
Quserdir =>
return (dir(Qid(QPATH(0, Quserdir), 0, Sys->QTDIR), "user", big 0, pg_cat.users[0].name, 8r555, pg_cat.last_sync), nil);
Quser =>
sysid := FILENO(p);
return (dir(Qid(QPATH(sysid, Quser), 0, Sys->QTDIR), pg_cat.user_name(sysid), big 0, pg_cat.users[0].name, 8r555, pg_cat.last_sync), nil);
Qsysid =>
sysid := FILENO(p);
return (dir(Qid(QPATH(sysid, Quser), 0, Sys->QTFILE), "sysid", big len string sysid, pg_cat.users[0].name, 8r444, pg_cat.last_sync), nil);
* =>
err_chan <- = "dirgen *";
return (nil, Enotfound);
}
}
navigator(navops: chan of ref Navop)
{
while((m := <-navops) != nil){
err_chan <-= "Navops";
pick n := m {
Stat =>
err_chan <-= "Stat";
n.reply <-= dirgen(n.path);
Walk =>
err_chan <-= "Walk to " + n.name;
case TYPE(n.path) {
Qdir =>
err_chan <-= " From Qdir";
case n.name {
".." =>
; # nop
"proc" =>
n.path = QPATH(0, Qprocdir);
n.reply <-= dirgen(n.path);
"user" =>
n.path = QPATH(0, Quserdir);
n.reply <-= dirgen(n.path);
* =>
n.reply <-= (nil, Enotfound);
}
Qprocdir =>
err_chan <-= " From Qprocdir";
case n.name {
".." =>
n.path = QPATH(0, Qdir);
n.reply <-= dirgen(n.path);
* =>
(numbits, bits) := sys->tokenize(n.name, "+");
err_chan <- = "numbits " + string numbits + " bits[0] " + hd bits;
(numargs, args) := sys->tokenize(hd tl bits, ",");
err_chan <- = "numargs " + string numargs;
if(numargs > 0)
err_chan <- = "args[0] " + hd args;
n.reply <-= (nil, Enotfound);
}
Quserdir =>
err_chan <-= " From Quserdir";
case n.name {
".." =>
n.path = QPATH(0, Qdir);
n.reply <-= dirgen(n.path);
* =>
(nil, sysid) := pg_cat.user_sysid(n.name);
if(sysid > 0) {
n.path = QPATH(sysid, Quser);
n.reply <-= dirgen(n.path);
} else {
n.reply <-= (nil, Enotfound);
}
}
Quser =>
err_chan <-= " From Quser";
case n.name {
".." =>
n.path = QPATH(0, Quserdir);
n.reply <-= dirgen(n.path);
"sysid" =>
n.path = QPATH(FILENO(n.path), Qsysid);
n.reply <-= dirgen(n.path);
"proc" =>
n.path = QPATH(FILENO(n.path), Quserprocdir);
n.reply <-= dirgen(n.path);
* =>
n.reply <-= (nil, Enotfound);
}
* =>
err_chan <-= " From *";
n.reply <-= (nil, "not a directory");
}
Readdir =>
err_chan <-= "Readdir";
i := n.offset;
case TYPE(m.path) {
Qprocdir =>
err_chan <-= "Qprocdir";
if(pg_cat.procs == nil)
pg_cat.fill_procs();
if(pg_cat.procs != nil) {
for(; --n.count >= 0 && i < len pg_cat.procs; i++) {
n.reply <-= dirgen(QPATH(i, Qproc)); # n² but the file will be small
}
}
Quserprocdir =>
if(pg_cat.procs == nil)
pg_cat.fill_procs();
if(pg_cat.procs != nil) {
sysid := FILENO(m.path);
offset := i;
for(k := 0 ; n.count > 0 && k < len pg_cat.procs; k++) {
if(pg_cat.procs[k].owner == sysid) {
if(offset == 0) {
n.reply <-= dirgen(QPATH(k, Qproc)); # n² but the file will be small
n.count--;
} else {
offset--;
}
}
}
}
Quserdir =>
err_chan <-= "Quserdir";
if(pg_cat.users == nil)
pg_cat.fill_users();
if(pg_cat.users != nil) {
for(; --n.count >= 0 && i < len pg_cat.users; i++) {
n.reply <-= dirgen(QPATH(pg_cat.users[i].sysid, Quser));
}
}
Quser =>
err_chan <-= "Quser";
if(i == 0 && n.count > 0) {
n.reply <-= dirgen(QPATH(0,Qsysid));
i++;
--n.count;
}
if(i == 1 && n.count-- > 0)
n.reply <-= dirgen(QPATH(0,Qprocdir));
Qdir =>
err_chan <-= "Qdir";
if(i == 0 && n.count > 0) {
n.reply <-= dirgen(QPATH(0,Qprocdir));
i++;
--n.count;
}
if(i == 1 && n.count-- > 0)
n.reply <-= dirgen(QPATH(0,Quserdir));
* =>
err_chan <-= "Not Dir";
n.reply <-= (nil, "not a directory");
}
n.reply <-= (nil, nil);
}
err_chan <-= "Navigated";
}
}
QPATH(w, q: int): big
{
return big ((w<<8)|q);
}
TYPE(path: big): int
{
return int path & 16rFF;
}
FILENO(path: big) : int
{
return (int path >> 8) & 16rFFFFFF;
}
|