implement Pg;
include "sys.m";
sys: Sys;
include "bufio.m";
bufio: Bufio;
Iobuf: import bufio;
include "keyring.m";
keyring: Keyring;
include "bytelib.m";
include "pg.m";
debug : con 0;
chomp(io : ref Iobuf) : string
{
t := io.gets(0);
if(len(t) > 0)
t = t[0:len(t) -1];
return t;
}
Data.extend_bytes(data : self ref Data, n : int) : int
{
if(data.bytes == nil) {
data.bytes = array[n] of byte;
return n;
}
bytes := array[len(data.bytes) + n] of byte;
for(i := 0; i < len(data.bytes); i++)
bytes[i] = data.bytes[i];
data.bytes = bytes;
return len(data.bytes);
}
Data.write(data : self ref Data, txt : string) : int
{
if(data.bytes == nil)
data.extend_bytes(3 * len(txt));
bytes_consumed : int;
bytes_written := 0;
for(i := 0; i < len(txt); i++) {
if(len(data.bytes) - data.ptr > 2)
bytes_consumed = sys->char2byte(txt[i], data.bytes, data.ptr);
else
bytes_consumed = 0;
if(bytes_consumed == 0)
break;
data.ptr += bytes_consumed;
bytes_written += bytes_consumed;
}
if(bytes_consumed == 0) {
data.extend_bytes(3 * (len(txt) - i));
bytes_written += data.write(txt[i:]);
}
return bytes_written;
}
Data.append(data : self ref Data, bytes : array of byte) : int
{
if(data.bytes == nil)
data.extend_bytes(len bytes);
else
if (len(data.bytes) - data.ptr < len(bytes))
data.extend_bytes(len(bytes) - (len(data.bytes) - data.ptr));
data.bytes[data.ptr:] = bytes;
data.ptr += len(bytes);
return len(bytes);
}
Data.md5(data : self ref Data) : string
{
keyring = load Keyring Keyring->PATH;
digest := array[16] of byte;
keyring->md5(data.bytes[0:data.ptr], data.ptr, digest, nil);
digest_text := "md5";
for(i:=0; i<16; i++) {
digest_text += sys->sprint("%02x", int(digest[i]));
}
return digest_text;
}
Data.puts(data : self ref Data, txt : string) : int
{
if(txt == nil) return 0;
return data.write(txt) + data.write("\0");
}
Data.put_notnil_s(data : self ref Data, key, value : string) : int
{
if(value != nil)
return data.puts(key) + data.puts(value);
return 0;
}
Data.send(data : self ref Data, io : ref Iobuf, tag : int) : int
{
written := 0;
if(tag > 0) {
written += io.putc(tag);
}
written += int32_write(io, 4 + data.ptr);
if(data.bytes != nil && data.ptr <= len(data.bytes))
written += io.write(data.bytes, data.ptr);
return written;
}
Data.to_string(data : self ref Data) : string
{
i, j, c, k : int;
bytes := int_to_bytes(data.ptr, 4);
out := "";
for(j = 0; j < 4; j++) {
out += sys->sprint("%02x ", int(bytes[j]));
}
out += sys->sprint("[%d]", data.ptr);
k = 0;
for(i = 0; k < data.ptr ; i += 16) {
out += sys->sprint("\n%02X : ", k);
for(j = 0; k < data.ptr && j < 16; j++) {
c = int(data.bytes[k]);
case c {
32 to 126 =>
out += sys->sprint(".%c", c);
* =>
out += sys->sprint(".%02X", c);
}
k++;
}
}
return out;
}
Parameter.read(p : self ref Parameter, io : ref Iobuf) : int
{
p.key = chomp(io);
if(len p.key == 0)
return 0;
p.value = chomp(io);
return 1;
}
Response.read(r : self ref Response, io : ref Iobuf) : int
{
r.code = io.getc();
if(r.code == 0)
return 0;
r.data = chomp(io);
return r.code;
}
Recordset.to_string(r : self ref Recordset) : string
{
str := "Recordset\n";
str += " Fields: " + string len r.fields + "\n";
for(i:=0; i < len(r.fields); i++)
str += " " + string i + " " + r.fields[i].to_string() + "\n";
numrows := len r.rows;
str += " Rows: " + string numrows + "\n";
numcols := len r.rows[i];
str += " Cols: " + string numcols + "\n";
for(i=0; i < numrows; i++) {
str += " " + string i + " ";
for(j:=0; j < numcols; j++) {
str += " ";
if(r.fields[j].format_code == 0)
for(k:=0; k < len(r.rows[i][j]); k++)
str += sys->sprint(".%02X", int r.rows[i][j][k]);
else
str += "X" + string r.rows[i][j];
}
str += "\n";
}
return str;
}
read_responses(io : ref Iobuf) : list of ref Response
{
responses : list of ref Response;
r := ref Response;
while(r.read(io)) {
responses = r :: responses;
r = ref Response;
}
return responses;
}
Field.read(f : self ref Field, io : ref Iobuf)
{
f.name = chomp(io);
f.table_oid = int32_read(io);
f.column_attribute_number = int16_read(io);
f.data_type_oid = int32_read(io);
f.data_type_size = int16_read(io);
f.type_modifier = int32_read(io);
f.format_code = int16_read(io);
}
Field.to_string(f : self ref Field) : string
{
fcode : string;
if(f.format_code == 0)
fcode = "ascii";
else
fcode = "bin";
fmod : string;
if(f.type_modifier != -1)
fmod = "(" + string f.type_modifier + ")";
else
fmod = "";
flen : string;
if(f.data_type_size == -1)
flen = "?";
else
flen = string f.data_type_size;
return sys->sprint("\"%s\" oid:%d.%d type %d[%s]%s %s", f.name, f.table_oid, f.column_attribute_number, f.data_type_oid, flen, fmod, fcode);
}
read_fields(io : ref Iobuf) : array of ref Field
{
field_count := int16_read(io);
fields := array[field_count] of ref Field;
for(i:= 0; i < field_count; i++) {
fields[i] = ref Field;
fields[i].read(io);
}
return fields;
}
Response.to_string(r : self ref Response) : string
{
case r.code {
'P' or 'p'=> return "position " + r.data;
'F'=> return "in file " + r.data;
'L'=> return ":" + r.data;
* => return r.data;
}
}
Parameter.to_string(p : self ref Parameter) : string
{
return p.key + "=" + string p.value;
}
response_by_code(responses : list of ref Response, code : int) : string
{
h : ref Response;
for(t:=responses; t != nil; t = tl t) {
h = hd t;
if(h.code == code) {
return h.to_string();
break;
}
}
return nil;
}
Backend_Message.to_string(b_msg : self ref Backend_Message) : string
{
str : string;
pick msg := b_msg {
Error =>
str = sys->sprint("%s (%s) %s %s%s %s", response_by_code(msg.responses, 'S'), response_by_code(msg.responses, 'C'), response_by_code(msg.responses, 'M'), response_by_code(msg.responses, 'F'), response_by_code(msg.responses, 'L'), response_by_code(msg.responses, 'R') );
h : int;
for(t:=list of {'D', 'H', 'P', 'p', 'q', 'W'}; t != nil; t = tl t) {
h = hd t;
s := response_by_code(msg.responses, h);
if(s != nil) {
str += sys->sprint(" (%c) %s", h, s);
}
}
Authentication =>
case msg.auth_type {
0 =>
str = "AuthenticationOK";
2 =>
str = "Kerberos V5 requested";
3 =>
str = "Clear Text requested";
4 =>
str = sys->sprint("Crypt requested salt %02X%02X", int(msg.salt[0]), int(msg.salt[1]));
5 =>
str = sys->sprint("MD5 requested salt %02X%02X%02X%02X", int(msg.salt[0]), int(msg.salt[1]), int(msg.salt[2]), int(msg.salt[3]));
6 =>
str = "SCM Credential requested";
* =>
str = "Unknown Auth Type";
}
ReadyForQuery =>
str = "ReadyForQuery";
RowDescription =>
str = sys->sprint("Row Description : %d columns\n", len(msg.fields));
for(i:=0; i < len(msg.fields); i++) {
str += sys->sprint(" %s\n", msg.fields[i].to_string());
}
DataRow =>
str = sys->sprint("Data Row column(s):%d \n", len(msg.columns));
for(i:=0; i<len(msg.columns); i++) {
str += sys->sprint("Column %02d\n", i);
if(msg.columns[i] == nil) {
str += "NULL\n";
} else {
d := ref Data;
d.bytes = msg.columns[i];
d.ptr = len(d.bytes);
str += sys->sprint("%s\n", d.to_string());
}
}
CopyData =>
str = "Copy Data\n";
if(msg.data == nil)
str += "NULL";
else {
d := ref Data(msg.data, len msg.data);
str += d.to_string();
}
CopyDone =>
str = "Copy Done";
CopyInResponse => # G
str = sys->sprint("Copy In Response copy format:%d columns:%d\n", int msg.copy_format, len msg.format_codes);
for(i:=0; i<len(msg.format_codes); i++)
str += sys->sprint("Column %02d: %d\n", i, msg.format_codes[i]);
CopyOutResponse => # H
str = sys->sprint("Copy Out Response copy format:%d columns:%d\n", int msg.copy_format, len msg.format_codes);
for(i:=0; i<len(msg.format_codes); i++)
str += sys->sprint("Column %02d: %d\n", i, msg.format_codes[i]);
CommandComplete => # C
str = msg.cmd + " oid:" + string msg.oid + " " + string msg.rows + " row(s)";
ParseComplete => # 1
str = "Parse Complete";
BindComplete => # 2
str = "Bind Complete";
CloseComplete => # 3
str = "Close Complete";
PortalSuspended => # s
str = "Portal Suspended";
EmptyQueryResponse => # l
str = "Empty Query Response";
NoData =>
str = "No Data";
ParameterDescription =>
str = sys->sprint("Parameters %d\n", len(msg.oids));
for(i:=0; i<len(msg.oids); i++)
str += sys->sprint(" %d oid: %d\n", i, msg.oids[i]);
Unknown =>
str = sys->sprint("Unknown TAG %c:\n", msg.tag);
d := ref Data(nil, 0);
d.append(msg.data);
str += sys->sprint("%s\n", d.to_string());
}
return str;
}
Connection.connect(connection: self ref Connection, ip, port, options, parameters : string) : int
{
sys = load Sys Sys->PATH;
bufio = load Bufio Bufio->PATH;
addr := sys->sprint("tcp!%s!%s", ip, port);
(i, c) := sys->dial(addr, nil);
if(i == -1) {
return 0;
}
connection.status = 1;
connection.fd = c.dfd;
connection.rx = chan of ref Backend_Message;
connection.tx = chan of ref Frontend_Message;
connection.notices = chan of ref Backend_Message;
spawn deal_with_notices(connection);
spawn deal_with_incoming(connection);
spawn deal_with_outgoing(connection);
connection.tx <-= ref Frontend_Message.StartupMessage(0, 196608, connection.user, connection.database, options, parameters);
for(msg := <- connection.rx; msg != nil && msg.tag != 'Z'; msg = <- connection.rx);
return 1;
}
Connection.set_parameter(c: self ref Connection, p : ref Parameter)
{
h : ref Parameter;
for(t:=c.parameters; t != nil; t = tl t) {
h = hd t;
if(h.key == p.key) {
h.value = p.value;
break;
}
}
if(t == nil) {
c.parameters = p :: c.parameters;
}
}
Connection.query(c: self ref Connection, sql : string) : ref Recordset
{
c.tx <-= ref Frontend_Message.Query('Q', sql);
r := ref Recordset(nil, nil);
rows : list of array of array of byte = nil;
for(msg := <- c.rx; msg.tag != 'Z'; msg = <- c.rx) {
pick m := msg {
RowDescription =>
r.fields = m.fields;
DataRow =>
rows = m.columns :: rows;
}
}
r.rows = array[len(rows)] of array of array of byte;
for(i := len(rows) -1 ; i >= 0; i--) {
r.rows[i] = hd rows;
rows = tl rows;
}
return r;
}
Connection.parse(c: self ref Connection, name, query : string, data_type_oids : array of int) : int
{
c.tx <-= ref Frontend_Message.Parse('P', name, query, data_type_oids);
c.tx <- = ref Frontend_Message.Sync('S');
for(m := <- c.rx; m.tag != 'E' && m.tag != '1'; m = <- c.rx)
c.notices <- = m;
if(m.tag == '1')
for(m = <- c.rx; m.tag != 'E' && m.tag != 'Z'; m = <- c.rx)
c.notices <- = m;
return m.tag == 'Z';
}
Connection.describe(c: self ref Connection, item_type : byte, name : string)
{
c.tx <- = ref Frontend_Message.Describe('D', item_type, name);
}
Connection.execute(c: self ref Connection, portal, name : string, parameter_format_codes : array of int, parameters : array of array of byte, result_format_codes : array of int, rows_to_return : int) : ref Recordset
{
recordset := ref Recordset(nil, nil);
pfc : array of int;
if(parameter_format_codes == nil)
pfc = array[0] of int;
# else
# parameter_format_codes = pfc;
c.tx <-= ref Frontend_Message.Bind('B', portal, name, pfc, parameters, result_format_codes);
c.tx <-= ref Frontend_Message.Flush('H');
for(msg := <- c.rx; msg.tag != '2' && msg.tag != 'E'; msg = <-c.rx) {
pick m := msg {
RowDescription =>
recordset.fields = m.fields;
}
}
c.tx <-= ref Frontend_Message.Describe('D', byte 'S', name);
c.tx <-= ref Frontend_Message.Flush('H');
# a 't' followed by a 'T'
for(msg = <- c.rx; msg.tag == 't' || msg.tag == 'T'; msg = <-c.rx) {
pick m := msg {
RowDescription =>
recordset.fields = m.fields;
}
if(msg.tag == 'T')
break;
}
if(msg.tag != 'T') return nil;
c.tx <-= ref Frontend_Message.Execute('E', portal, rows_to_return);
c.tx <-= ref Frontend_Message.Sync('S');
rows : list of array of array of byte = nil;
for(msg = <- c.rx; msg.tag != 'C' && msg.tag != 'E' && msg.tag != 'Z'; msg = <-c.rx) {
pick m := msg {
DataRow =>
rows = m.columns :: rows;
}
}
recordset.rows = array[len(rows)] of array of array of byte;
i := len(rows) -1;
while(rows != nil) {
recordset.rows[i--] = hd rows;
rows = tl rows;
}
return recordset;
}
Connection.disconnect(c: self ref Connection)
{
c.status = 0;
c.tx <-= ref Frontend_Message.Terminate;
c.tx <-= nil;
c.notices <- = nil;
}
peek_at_data(io : ref Iobuf, size : int)
{
data := ref Data(array[size] of byte, size);
io.seek(big(-5), bufio->SEEKRELA);
tag := io.getc();
io.seek(big(4), bufio->SEEKRELA);
data.ptr = io.read(data.bytes, size);
io.seek(big(-data.ptr), bufio->SEEKRELA);
sys->print("RX '%c' %s\n", tag, data.to_string());
}
deal_with_outgoing(c : ref Connection)
{
Iobuf : import bufio;
d : ref Data;
d = nil;
tag : int;
io := bufio->fopen(c.fd, Bufio->OWRITE);
for(f_msg := <- c.tx; f_msg != nil; f_msg = <- c.tx) {
pick msg := f_msg {
StartupMessage =>
tag = msg.tag;
if(msg.user == nil)
return;
d = ref Data(int_to_bytes(16r00030000, 4), 4);
d.put_notnil_s("user", msg.user);
d.put_notnil_s("database", msg.database);
d.put_notnil_s("options", msg.options);
d.puts(msg.parameters);
d.write("\0");
PasswordMessage =>
tag = msg.tag;
d = ref Data(nil, 0);
d.puts(msg.password);
Query =>
tag = msg.tag;
d = ref Data(nil, 0);
d.puts(msg.sql);
CopyData =>
tag = msg.tag;
if(msg.data == nil)
d = ref Data(nil, 0);
else
d = ref Data(msg.data, len msg.data);
CopyFail =>
tag = msg.tag;
d = ref Data(nil, 0);
CopyDone =>
tag = msg.tag;
d = ref Data(nil, 0);
Parse =>
tag = msg.tag;
d = ref Data(nil, 0);
if(msg.name == nil)
d.write("\0");
else
d.puts(msg.name);
d.puts(msg.query);
d.append(int_to_bytes(len msg.data_type_oids, 2));
for(i:= 0; i < len(msg.data_type_oids); i++)
d.append(int_to_bytes(msg.data_type_oids[i], 4));
Bind =>
tag = msg.tag;
d = ref Data(nil, 0);
if(msg.portal == "")
d.write("\0");
else
d.puts(msg.portal);
if(msg.name == "")
d.write("\0");
else
d.puts(msg.name);
d.append(int_to_bytes(len msg.parameter_format_codes, 2));
i : int;
for(i = 0; i < len msg.parameter_format_codes; i++)
d.append(int_to_bytes(msg.parameter_format_codes[i], 2));
d.append(int_to_bytes(len msg.parameters, 2));
for(i = 0; i < len msg.parameters; i++) {
if(msg.parameters[i] == nil)
d.append(int_to_bytes(-1, 4));
else {
d.append(int_to_bytes(len msg.parameters[i], 4));
d.append(msg.parameters[i]);
}
}
d.append(int_to_bytes(len msg.result_format_codes, 2));
for(i = 0; i < len msg.result_format_codes; i++)
d.append(int_to_bytes(msg.result_format_codes[i], 2));
Execute =>
tag = msg.tag;
d = ref Data(nil, 0);
if(msg.portal == nil)
d.write("\0");
else
d.puts(msg.portal);
d.append(int_to_bytes(msg.rows_to_return, 4));
Sync =>
tag = msg.tag;
d = ref Data(nil, 0);
Describe =>
tag = msg.tag;
d = ref Data(array[1] of byte, 1);
d.bytes[0] = msg.item_type;
if(msg.name == nil)
d.write("\0");
else
d.puts(msg.name);
Close =>
tag = msg.tag;
d = ref Data(array[1] of byte, 1);
d.bytes[0] = msg.item_type;
d.puts(msg.name);
Flush =>
tag = msg.tag;
d = ref Data(nil, 0);
Terminate =>
tag = msg.tag;
d = ref Data(nil, 0);
}
if(d != nil) {
written := d.send(io, tag);
io.flush();
if(debug)
sys->print("TX %d '%c': %s\n\n", written, tag, d.to_string());
} else {
if(debug)
sys->print("Outgoing data was nil, not sent\n");
}
}
}
deal_with_incoming(c : ref Connection)
{
Iobuf : import bufio;
msg : ref Backend_Message;
size : int;
io := bufio->fopen(c.fd, Bufio->OREAD);
for(tag := io.getc(); tag > 0 && c.status > 0; tag = io.getc()) {
size = int32_read(io) - 4; # includes self
if(debug)
peek_at_data(io, size);
msg = nil;
case tag {
'E' =>
msg = ref Backend_Message.Error(tag, read_responses(io));
c.notices <-= msg;
'N' =>
msg = ref Backend_Message.NoticeResponse(tag, read_responses(io));
c.notices <-= msg;
'R' =>
auth_type := int32_read(io);
salt : array of byte = nil;
case auth_type {
3 =>
c.tx <-= ref Frontend_Message.PasswordMessage('p', c.password);
4 =>
salt = array[2] of byte;
io.read(salt, 2);
5 => # doesn't work :=( hash is wrong
salt = array[4] of byte;
io.read(salt, 4);
d := ref Data(nil, 0);
d.write(c.password);
d.write(c.user);
un_pw_digest := d.md5();
d = ref Data(nil, 0);
d.write(un_pw_digest);
d.append(salt);
c.tx <-= ref Frontend_Message.PasswordMessage('p', d.md5());
}
msg = ref Backend_Message.Authentication(tag, auth_type, salt);
'S' =>
p := ref Parameter;
if(p.read(io) > 0)
c.set_parameter(p);
'K' =>
c.process_id = int32_read(io);
c.key = int32_read(io);
'Z' =>
msg = ref Backend_Message.ReadyForQuery(tag,io.getc());
'T' =>
msg = ref Backend_Message.RowDescription(tag,read_fields(io));
'D' =>
num_cols := int16_read(io);
columns := array[num_cols] of array of byte;
for(i:= 0; i<num_cols; i++) {
data_size := int32_read(io);
if(data_size >= 0) {
columns[i] = array[data_size] of byte;
if(data_size > 0)
io.read(columns[i], data_size);
} else {
columns[i] = nil;
}
}
msg = ref Backend_Message.DataRow(tag, columns);
'd' =>
data := array[size] of byte;
io.read(data, size);
msg = ref Backend_Message.CopyData(tag, data);
'c' =>
msg = ref Backend_Message.CopyDone(tag);
'G' =>
copy_format := byte io.getc();
column_count := int16_read(io);
format_codes := array[column_count] of int;
for(i := 0; i < column_count; i++)
format_codes[i] = int16_read(io);
msg = ref Backend_Message.CopyInResponse(tag, copy_format, format_codes);
'H' =>
copy_format := byte io.getc();
column_count := int16_read(io);
format_codes := array[column_count] of int;
for(i := 0; i < column_count; i++)
format_codes[i] = int16_read(io);
msg = ref Backend_Message.CopyOutResponse(tag, copy_format, format_codes);
'C' =>
oid, rows : int;
(numwords, words) := sys->tokenize(chomp(io), " ");
cmd := hd words;
words = tl words;
case numwords {
3 =>
oid = int hd words;
rows = int hd tl words;
2 =>
oid = 0;
rows = int hd words;
* =>
oid = 0;
rows = 0;
}
msg = ref Backend_Message.CommandComplete(tag, cmd, oid, rows);
c.notices <-= msg;
msg = nil;
'1' =>
msg = ref Backend_Message.ParseComplete(tag);
'2' =>
msg = ref Backend_Message.BindComplete(tag);
'3' =>
msg = ref Backend_Message.CloseComplete(tag);
'I' =>
msg = ref Backend_Message.EmptyQueryResponse(tag);
's' =>
msg = ref Backend_Message.PortalSuspended(tag);
'n' =>
msg = ref Backend_Message.NoData(tag);
't' =>
oid_count := int16_read(io);
oids := array[oid_count] of int;
for(i := 0; i < oid_count; i++)
oids[i] = int32_read(io);
msg = ref Backend_Message.ParameterDescription(tag, oids);
* =>
data := array[size] of byte;
io.read(data, size);
msg = ref Backend_Message.Unknown(tag, data);
}
if(msg != nil) {
if(debug)
sys->print("%s\n\n", msg.to_string());
c.rx <-= msg;
}
}
}
deal_with_notices(c : ref Connection)
{
stderr := sys->fildes(2);
for(msg := <-c.notices; msg != nil; msg = <-c.notices) {
pick m:= msg {
Error =>
if(response_by_code(m.responses, 'S') == "FATAL") {
c.disconnect();
}
}
if(debug)
sys->fprint(stderr, "%s\n", msg.to_string());
}
}
|