// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// This package implements Native Client's simple RPC (SRPC).
package srpc
import (
"bytes";
"log";
"os";
"sync";
)
// A Client represents the client side of an SRPC connection.
type Client struct {
fd int; // fd to server
r msgReceiver;
s msgSender;
service map[string]srv; // services by name
out chan *msg; // send to out to write to connection
mu sync.Mutex; // protects pending, idGen
pending map[uint64]*RPC;
idGen uint64; // generator for request IDs
}
// A srv is a single method that the server offers.
type srv struct {
num uint32; // method number
fmt string; // argument format
}
// An RPC represents a single RPC issued by a client.
type RPC struct {
Ret []interface{}; // Return values
Done chan *RPC; // Channel where notification of done arrives
Errno Errno; // Status code
c *Client;
id uint64; // request id
}
// NewClient allocates a new client using the file descriptor fd.
func NewClient(fd int) (c *Client, err os.Error) {
c = new(Client);
c.fd = fd;
c.r.fd = fd;
c.s.fd = fd;
c.service = make(map[string]srv);
c.pending = make(map[uint64]*RPC);
// service discovery request
m := &msg{
protocol: protocol,
isReq: true,
Ret: []interface{}{[]byte(nil)},
Size: []int{4000},
};
m.packRequest();
c.s.send(m);
m, err = c.r.recv();
if err != nil {
return nil, err
}
m.unpackResponse();
if m.status != OK {
log.Stderrf("NewClient service_discovery: %s", m.status);
return nil, m.status;
}
for n, line := range bytes.Split(m.Ret[0].([]byte), []byte{'\n'}, 0) {
i := bytes.Index(line, []byte{':'});
if i < 0 {
continue
}
c.service[string(line[0:i])] = srv{uint32(n), string(line[i+1:])};
}
c.out = make(chan *msg);
go c.input();
go c.output();
return c, nil;
}
func (c *Client) input() {
for {
m, err := c.r.recv();
if err != nil {
log.Exitf("client recv: %s", err)
}
if m.unpackResponse(); m.status != OK {
log.Stderrf("invalid message: %s", m.status);
continue;
}
c.mu.Lock();
rpc, ok := c.pending[m.requestId];
if ok {
c.pending[m.requestId] = nil, false
}
c.mu.Unlock();
if !ok {
log.Stderrf("unexpected response");
continue;
}
rpc.Ret = m.Ret;
rpc.Done <- rpc;
}
}
func (c *Client) output() {
for m := range c.out {
c.s.send(m)
}
}
// NewRPC creates a new RPC on the client connection.
func (c *Client) NewRPC(done chan *RPC) *RPC {
if done == nil {
done = make(chan *RPC)
}
c.mu.Lock();
id := c.idGen;
c.idGen++;
c.mu.Unlock();
return &RPC{nil, done, OK, c, id};
}
// Start issues an RPC request for method name with the given arguments.
// The RPC r must not be in use for another pending request.
// To wait for the RPC to finish, receive from r.Done and then
// inspect r.Ret and r.Errno.
func (r *RPC) Start(name string, arg []interface{}) {
var m msg;
r.Errno = OK;
r.c.mu.Lock();
srv, ok := r.c.service[name];
if !ok {
r.c.mu.Unlock();
r.Errno = ErrBadRPCNumber;
r.Done <- r;
return;
}
r.c.pending[r.id] = r;
r.c.mu.Unlock();
m.protocol = protocol;
m.requestId = r.id;
m.isReq = true;
m.rpcNumber = srv.num;
m.Arg = arg;
// Fill in the return values and sizes to generate
// the right type chars. We'll take most any size.
// Skip over input arguments.
// We could check them against arg, but the server
// will do that anyway.
i := 0;
for srv.fmt[i] != ':' {
i++
}
fmt := srv.fmt[i+1:];
// Now the return prototypes.
m.Ret = make([]interface{}, len(fmt)-i);
m.Size = make([]int, len(fmt)-i);
for i := 0; i < len(fmt); i++ {
switch fmt[i] {
default:
log.Exitf("unexpected service type %c", fmt[i])
case 'b':
m.Ret[i] = false
case 'C':
m.Ret[i] = []byte(nil);
m.Size[i] = 1 << 30;
case 'd':
m.Ret[i] = float64(0)
case 'D':
m.Ret[i] = []float64(nil);
m.Size[i] = 1 << 30;
case 'h':
m.Ret[i] = int(-1)
case 'i':
m.Ret[i] = int32(0)
case 'I':
m.Ret[i] = []int32(nil);
m.Size[i] = 1 << 30;
case 's':
m.Ret[i] = "";
m.Size[i] = 1 << 30;
}
}
m.packRequest();
r.c.out <- &m;
}
// Call is a convenient wrapper that starts the RPC request,
// waits for it to finish, and then returns the results.
// Its implementation is:
//
// r.Start(name, arg);
// <-r.Done;
// return r.Ret, r.Errno;
//
func (r *RPC) Call(name string, arg []interface{}) (ret []interface{}, err Errno) {
r.Start(name, arg);
<-r.Done;
return r.Ret, r.Errno;
}
|