// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package rpc
import (
"bufio";
"gob";
"http";
"io";
"log";
"net";
"os";
"sync";
)
// Call represents an active RPC.
type Call struct {
ServiceMethod string; // The name of the service and method to call.
Args interface{}; // The argument to the function (*struct).
Reply interface{}; // The reply from the function (*struct).
Error os.Error; // After completion, the error status.
Done chan *Call; // Strobes when call is complete; value is the error status.
seq uint64;
}
// Client represents an RPC Client.
// There may be multiple outstanding Calls associated
// with a single Client.
type Client struct {
mutex sync.Mutex; // protects pending, seq
shutdown os.Error; // non-nil if the client is shut down
sending sync.Mutex;
seq uint64;
conn io.ReadWriteCloser;
enc *gob.Encoder;
dec *gob.Decoder;
pending map[uint64]*Call;
}
func (client *Client) send(c *Call) {
// Register this call.
client.mutex.Lock();
if client.shutdown != nil {
c.Error = client.shutdown;
client.mutex.Unlock();
_ = c.Done <- c; // do not block
return;
}
c.seq = client.seq;
client.seq++;
client.pending[c.seq] = c;
client.mutex.Unlock();
// Encode and send the request.
request := new(Request);
client.sending.Lock();
request.Seq = c.seq;
request.ServiceMethod = c.ServiceMethod;
client.enc.Encode(request);
err := client.enc.Encode(c.Args);
if err != nil {
panicln("rpc: client encode error:", err.String())
}
client.sending.Unlock();
}
func (client *Client) input() {
var err os.Error;
for err == nil {
response := new(Response);
err = client.dec.Decode(response);
if err != nil {
if err == os.EOF {
err = io.ErrUnexpectedEOF
}
break;
}
seq := response.Seq;
client.mutex.Lock();
c := client.pending[seq];
client.pending[seq] = c, false;
client.mutex.Unlock();
err = client.dec.Decode(c.Reply);
// Empty strings should turn into nil os.Errors
if response.Error != "" {
c.Error = os.ErrorString(response.Error)
} else {
c.Error = nil
}
// We don't want to block here. It is the caller's responsibility to make
// sure the channel has enough buffer space. See comment in Go().
_ = c.Done <- c; // do not block
}
// Terminate pending calls.
client.mutex.Lock();
client.shutdown = err;
for _, call := range client.pending {
call.Error = err;
_ = call.Done <- call; // do not block
}
client.mutex.Unlock();
log.Stderr("rpc: client protocol error:", err);
}
// NewClient returns a new Client to handle requests to the
// set of services at the other end of the connection.
func NewClient(conn io.ReadWriteCloser) *Client {
client := new(Client);
client.conn = conn;
client.enc = gob.NewEncoder(conn);
client.dec = gob.NewDecoder(conn);
client.pending = make(map[uint64]*Call);
go client.input();
return client;
}
// DialHTTP connects to an HTTP RPC server at the specified network address.
func DialHTTP(network, address string) (*Client, os.Error) {
conn, err := net.Dial(network, "", address);
if err != nil {
return nil, err
}
io.WriteString(conn, "CONNECT "+rpcPath+" HTTP/1.0\n\n");
// Require successful HTTP response
// before switching to RPC protocol.
resp, err := http.ReadResponse(bufio.NewReader(conn));
if err == nil && resp.Status == connected {
return NewClient(conn), nil
}
if err == nil {
err = os.ErrorString("unexpected HTTP response: " + resp.Status)
}
conn.Close();
return nil, &net.OpError{"dial-http", network + " " + address, nil, err};
}
// Dial connects to an RPC server at the specified network address.
func Dial(network, address string) (*Client, os.Error) {
conn, err := net.Dial(network, "", address);
if err != nil {
return nil, err
}
return NewClient(conn), nil;
}
// Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
c := new(Call);
c.ServiceMethod = serviceMethod;
c.Args = args;
c.Reply = reply;
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
// If caller passes done != nil, it must arrange that
// done has enough buffer for the number of simultaneous
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
log.Crash("rpc: done channel is unbuffered")
}
}
c.Done = done;
if client.shutdown != nil {
c.Error = client.shutdown;
_ = c.Done <- c; // do not block
return c;
}
client.send(c);
return c;
}
// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) os.Error {
if client.shutdown != nil {
return client.shutdown
}
call := <-client.Go(serviceMethod, args, reply, nil).Done;
return call.Error;
}
|