#include "deluge.h"
typedef struct Listenargs Listenargs;
typedef struct Readerargs Readerargs;
typedef struct Writerargs Writerargs;
typedef struct Readmsg Readmsg;
typedef struct Hsin Hsin;
typedef struct Trackreq Trackreq;
typedef struct Stale Stale;
struct Listenargs {
char *adir;
Channel *newpeer;
Torrent *t;
};
struct Readerargs {
Channel *out;
Channel *stalechan;
Peer *p;
int nbytes;
};
struct Writerargs {
Channel *in;
Channel *out;
int fd;
Channel *stalechan;
int peern;
};
struct Readmsg {
Msg *m;
};
struct Hsin {
Peer *p;
uchar *infohash;
uchar *ourpeerid;
};
struct Trackreq {
char *announce;
char *listenport;
uchar *infohash;
uchar *peerid;
int npieces;
vlong ul, dl, left;
char *event;
};
struct Stale {
int peern;
Peer *p;
int bad;
};
void
torrentinit(Torrent *t, Bee *b, int needopen, int maycreate)
{
Bee*info;
int i;
char *p;
int plen;
uchar *pieces;
vlong pieceslen;
vlong piecelen;
t->filecreated = 0;
ebeedictget(b, "info", TDict, &info, nil);
beepickle(info, &p, &plen);
sha1((uchar *)p, plen, t->infohash, nil);
free(p);
ebeedictget(b, "announce", TString, &t->announce, nil);
ebeedictget(info, "piece length", TInteger, &piecelen, nil);
t->piecelen = (ulong)piecelen;
ebeedictget(b, "info", TDict, &info, nil);
fileinit(t, info, needopen, maycreate);
ebeedictget(info, "pieces", TString, &pieces, &pieceslen);
if(pieceslen % Piecehashlen != 0)
sysfatal("string pieces has invalid size (%lld %% %d != 0)", pieceslen, Piecehashlen);
t->npieces = pieceslen / Piecehashlen;
DEBUG(2, "have npieces=%d\n", t->npieces);
t->pieces = emalloc(sizeof t->pieces[0] * t->npieces);
for(i = 0; i < t->npieces; i++){
piecelen = t->piecelen;
if(i == t->npieces - 1)
piecelen = t->length - ((t->npieces - 1)*t->piecelen);
pieceinit(&t->pieces[i], i, &pieces[i*Piecehashlen], piecelen);
}
t->haves = bitnew(t->npieces, nil);
t->stored = 0;
DEBUG(2, "done for torrentinit\n");
}
void
torrentverify(Torrent *t)
{
int i;
for(i = 0; i < t->npieces; i++){
if(filepiecehashokay(t, &t->pieces[i])){
bitset(t->haves, i);
t->stored += t->pieces[i].length;
}
}
DEBUG(2, "done for torrentverify\n");
}
int
nchoked(Torrent *t)
{
int n;
Peer *p;
n = 0;
for(p = t->peers; p; p = p->next)
if(IsChoked(p))
n++;
return n;
}
void
kickwriter(Peer *p)
{
if(p->meta){
if(nbsendp(p->write, p->meta) != 0){
p->meta = p->meta->next;
return;
}
}
if(p->dataout){
if(nbsendp(p->write, p->dataout) != 0){
p->dataout = p->dataout->next;
return;
}
}
}
void
sendstatusmsg(Peer *p, int type)
{
Msg *m;
m = msgnew(type, p->n);
msgappend(&p->meta, m);
kickwriter(p);
}
void
choke(Peer *p)
{
Bite *b;
DEBUG(2, "choked peer=%d\n", p->n);
p->changed = 1;
setischoked(p, 1);
p->lastchange = time(0);
while(p->rreq){
b = p->rreq;
p->rreq = b->next;
bitefree(b);
}
sendstatusmsg(p, MChoke);
}
void
unchoke(Peer *p)
{
DEBUG(2, "unchoked peer=%d\n", p->n);
p->changed = 1;
setischoked(p, 0);
p->lastchange = time(0);
sendstatusmsg(p, MUnchoke);
}
void
interestedin(Peer *p)
{
DEBUG(2, "now interested in peer=%d\n", p->n);
p->changed = 1;
setisinteresting(p, 1);
p->lastchange = time(0);
sendstatusmsg(p, MInterested);
}
void
notinterestedin(Peer *p)
{
DEBUG(2, "now notinterested in peer=%d\n", p->n);
p->changed = 1;
setisinteresting(p, 0);
p->lastchange = time(0);
sendstatusmsg(p, MNotinterested);
}
void
cancel(Peer *p, Bite *b)
{
Msg *m;
Bite *b2;
b2 = bitefind(p->lreq, b->piecen, b->offset, b->length);
if(b2 == nil)
return;
biteremove(&p->lreq, b2);
bitefree(b2);
for(m = p->meta; m; m = m->next){
if(m->index == b->piecen && m->begin == b->offset && m->length == b->length){
msgremove(&p->meta, m);
msgfree(m);
DEBUG(2, "cancel: cancelled not-yet-sent request\n");
return;
}
}
m = msgnew(MCancel, p->n);
m->index = b->piecen;
m->begin = b->offset;
m->length = b->length;
msgappend(&p->meta, m);
kickwriter(p);
}
void
request(Torrent *t, Peer *p, Bite *b)
{
Piece *pc;
Msg *m;
pc = &t->pieces[b->piecen];
bitset(t->reqpieces, b->piecen);
bitset(pc->reqbites, b->n);
biteappend(&p->lreq, b);
m = msgnew(MRequest, p->n);
m->index = b->piecen;
m->begin = b->offset;
m->length = b->length;
msgappend(&p->meta, m);
kickwriter(p);
}
void
have(Peer *p, int piecen)
{
Msg *m;
m = msgnew(MHave, p->n);
m->have = piecen;
msgappend(&p->meta, m);
kickwriter(p);
}
void
stale(Channel *c, int bad, int peern, Peer *p)
{
Stale s;
assert((peern == -1 || p == nil) && (peern != -1 || p != nil));
s.bad = bad;
s.p = p;
s.peern = peern;
send(c, &s);
}
void
disconnect(Torrent *t, Peer *p, int bad)
{
Bite *b, *b2;
Msg *m, *m2;
Piece *pc;
int i;
b = p->lreq;
while(b){
pc = &t->pieces[b->piecen];
bitunset(pc->reqbites, b->n);
b2 = b;
b = b->next;
bitefree(b2);
}
while(b){
b2 = b;
b = b->next;
bitefree(b2);
}
m = p->meta;
while(m){
m2 = m;
m = m->next;
msgfree(m2);
}
m = p->dataout;
while(m){
m2 = m;
m = m->next;
msgfree(m2);
}
p->lreq = p->rreq = nil;
p->meta = p->dataout = nil;
for(i = 0; i < t->npieces; i++)
if(bitget(p->pieces, i))
pieceremoveid(&t->pieces[i], p->n);
close(p->fd);
p->status &= ~(Valid|Connecting);
peerremove(&t->peers, p);
if(bad)
peeradd(&t->bad, p);
else
peerfree(p);
}
Peer *
randomunchokable(Torrent *t)
{
Peer *p;
Peer **r = nil;
int rn = 0;
for(p = t->peers; p; p = p->next){
if(IsInterested(p) && IsChoked(p) && !p->changed && Stable(p)){
r = erealloc(r, (rn+1) * sizeof r[0]);
r[rn++] = p;
}
}
if(rn == 0)
return nil;
p = r[rand() % rn];
free(r);
return p;
}
Peer *
worstchokable(Torrent *t)
{
Peer *p, *rp;
int type;
type = (t->strategy == Seeding) ? Upload : Download;
rp = nil;
for(p = t->peers; p; p = p->next)
if(!p->changed && !IsChoked(p) && Stable(p) &&
(rp == nil || (ratesum(p->rate, 20, type) < ratesum(rp->rate, 20, type))))
rp = p;
return rp;
}
int
cmp(vlong v1, vlong v2)
{
if(v1 < v2)
return -1;
if(v1 > v2)
return 1;
return 0;
}
int
deserveunchokecmp(void *pp1, void *pp2, int which)
{
Peer *p1, *p2;
vlong v1, v2;
p1 = *(Peer **)pp1;
p2 = *(Peer **)pp2;
if(IsInterested(p1) && !IsInterested(p2))
return -1;
if(!IsInterested(p1) && IsInterested(p2))
return 1;
v1 = ratesum(p1->rate, 20, which);
v2 = ratesum(p2->rate, 20, which);
return cmp(v1, v2);
}
// XXX unchoking in seed mode should be done based on round robin, otherwise a fast leecher dominates a seed.
int
deserveunchokecmp_seeding(void *pp1, void *pp2)
{
return deserveunchokecmp(pp1, pp2, Upload);
}
int
deserveunchokecmp_notseeding(void *pp1, void *pp2)
{
return deserveunchokecmp(pp1, pp2, Download);
}
void
rethinkunchokes(Torrent *t, Peer **cp, int *ncp)
{
Peer **r, **rp, *p;
int i;
r = emalloc(peerlen(t->peers) * sizeof r[0]);
rp = r;
for(p = t->peers; p; p = p->next)
*rp++ = p;
qsort(r, peerlen(t->peers), sizeof (Peer *), t->strategy == Seeding ? deserveunchokecmp_seeding : deserveunchokecmp_notseeding);
for(i = 0; i < MIN(peerlen(t->peers), WantUnchokedCount); i++)
cp[i] = r[i];
*ncp = i;
free(r);
}
void
listener(void *ap)
{
Listenargs *la;
Channel *newpeer;
char *adir;
int lcfd, dfd;
char ldir[NETPATHLEN];
NetConnInfo *nci;
Peer *p;
Torrent *t;
la = ap;
newpeer = la->newpeer;
adir = la->adir;
t = la->t;
free(ap);
for(;;){
DEBUG(2, "listening for incoming request\n");
lcfd = listen(adir, ldir);
if(lcfd < 0){
fprint(2, "listen proc exiting...\n");
return;
}
dfd = accept(lcfd, ldir);
if(dfd < 0){
DEBUG(2, "accept failed: %r\n");
continue;
}
nci = getnetconninfo(nil, dfd);
p = peernew(t->npieces);
p->fd = dfd;
p->ip = estrdup(nci->rsys);
p->port = estrdup(nci->rserv);
sendp(newpeer, p);
freenetconninfo(nci);
DEBUG(2, "newpeer has been send\n");
}
}
void
timer(void *p)
{
Channel *c;
c = p;
for(;;){
sendp(c, nil);
sleep(1000);
}
}
void
reader(void *ap)
{
Readerargs *rdargs;
Channel *out;
Channel *stalechan;
Msg *m;
char *errmsg;
Peer *p;
char buf[4];
int len;
char *msgbuf;
int nbytes;
long n;
rdargs = ap;
p = rdargs->p;
out = rdargs->out;
stalechan = rdargs->stalechan;
nbytes = rdargs->nbytes;
free(rdargs);
for(;;){
m = emalloc(sizeof m[0]);
n = readn(p->fd, buf, 4);
if(n <= 0){
free(m);
DEBUG(2, "reader: only read %ld out of 4: %r\n", n);
stale(stalechan, 0, p->n, nil);
return;
}
len = GET32(buf);
if(len > msgmaxlen(nbytes)){
free(m);
DEBUG(2, "reader: message too long (%d)\n", len);
stale(stalechan, 0, p->n, nil);
return;
}
msgbuf = emalloc(len);
n = readn(p->fd, msgbuf, len);
if(n < 0 || (len > 0 && n == 0)){
free(m);
free(msgbuf);
DEBUG(2, "reader: only read %ld out of %d: %r\n", n, len);
stale(stalechan, 0, p->n, nil);
return;
}
errmsg = msgparse(msgbuf, len, m, nbytes);
free(msgbuf);
if(errmsg){
DEBUG(2, "reader: parsing message: %s\n", errmsg);
free(m);
free(errmsg);
stale(stalechan, 0, p->n, nil);
return;
}
m->peern = p->n;
DEBUG(2, "reader: %M\n", m);
sendp(out, m);
}
}
void
writer(void *ap)
{
Writerargs *wrargs;
Channel *in, *out, *stalechan;
Msg *m;
char *errmsg;
int fd;
int peern;
wrargs = ap;
in = wrargs->in;
out = wrargs->out;
stalechan = wrargs->stalechan;
fd = wrargs->fd;
peern = wrargs->peern;
free(wrargs);
for(;;){
m = recvp(in);
errmsg = msgwrite(fd, m);
if(errmsg){
DEBUG(2, "writer: error: %s\n", errmsg);
free(errmsg);
msgfree(m);
stale(stalechan, 0, peern, nil);
return;
}
DEBUG(2, "writer: %M\n", m);
sendp(out, m);
}
}
void
dialer(void *ap)
{
Channel *in;
Channel *out;
Channel *stalec;
Channel **chans;
Peer *p;
chans = ap;
in = chans[0];
out = chans[1];
stalec = chans[2];
free(ap);
for(;;){
recv(in, &p);
DEBUG(2, "dialer: connecting to ip=%s port=%s\n", p->ip, p->port);
p->fd = dial(netmkaddr(p->ip, 0, p->port), 0, 0, 0);
if(p->fd < 0){
DEBUG(2, "dialing %s: %r\n", netmkaddr(p->ip, 0, p->port));
stale(stalec, 1, -1, p);
continue;
}
sendp(out, p);
}
}
void
handshaker(void *ap)
{
Channel *in;
Channel *out;
Channel *stalec;
Channel **chans;
Hsin hi;
char *errmsg;
uchar peerid[Peeridlen];
Peer *p;
uchar *infohash;
uchar *ourpeerid;
chans = ap;
in = chans[0];
out = chans[1];
stalec = chans[2];
free(ap);
for(;;){
recv(in, &hi);
p = hi.p;
infohash = hi.infohash;
ourpeerid = hi.ourpeerid;
errmsg = peerwritestart(p->fd, infohash, ourpeerid);
if(errmsg){
DEBUG(2, "handshaker: %s\n", errmsg);
free(errmsg);
stale(stalec, 1, -1, p);
continue;
}
errmsg = peerreadstart(p->fd, infohash, peerid);
if(errmsg){
DEBUG(2, "handshaker: reading handshake: %s\n", errmsg);
free(errmsg);
stale(stalec, 1, -1, p);
continue;
}
if(memcmp(p->peerid, "\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", Peeridlen) == 0){
memmove(p->peerid, peerid, Peeridlen);
}else{
if(memcmp(p->peerid, peerid, Peeridlen) != 0){
DEBUG(2, "handshaker: unexpected peerid\n");
stale(stalec, 1, -1, p);
continue;
}
}
sendp(out, p);
}
}
void
tracker(void *ap)
{
Channel *in;
Channel *prospect;
Channel *ival;
Channel **chans;
Bee trackerb;
Bee *tmp, *tmp2;
Bee *ipb, *portb, *peeridb;
Trackreq tr;
ulong interval;
char *errmsg;
int i;
Peer *p;
chans = ap;
in = chans[0];
prospect = chans[1];
ival = chans[2];
free(ap);
for(;;){
recv(in, &tr);
DEBUG(2, "calling trackerget\n");
errmsg = trackerget(&trackerb, tr.announce, tr.listenport, tr.infohash, tr.peerid, tr.ul, tr.dl, tr.left, tr.event);
if(errmsg){
DEBUG(2, "trackerget: %s", errmsg);
continue;
}
if(debug)
beeprint(2, 0, &trackerb);
tmp = beedictget(&trackerb, "interval", TInteger);
if(tmp == nil){
DEBUG(2, "missing key interval in initial response from tracker");
continue;
}
interval = (ulong)tmp->i;
tmp = beedictget(&trackerb, "peers", TList);
if(tmp == nil){
DEBUG(2, "missing key peers in initial response from tracker");
continue;
}
for(i = 0; i < tmp->nl; i++){
tmp2 = &tmp->l[i];
if(tmp2->type != TDict)
DEBUG(2, "element in peers list is not dict\n");
ipb = beedictget(tmp2, "ip", TString);
portb = beedictget(tmp2, "port", TInteger);
peeridb = beedictget(tmp2, "peer id", TString);
if(ipb == nil || portb == nil || peeridb == nil){
DEBUG(2, "missing key ipb or portb or peeridb");
continue;
}
/* XXX check if lenghts are okay */
p = peernew(tr.npieces);
p->status |= Connecting;
p->ip = estrdup(ipb->s);
p->port = smprint("%lld", portb->i);
memmove(p->peerid, peeridb->s, Peeridlen);
sendp(prospect, p);
}
send(ival, &interval);
beefree(&trackerb);
}
}
void
torrentprep(Torrent *t)
{
char adir[NETPATHLEN];
int acfd;
Listenargs *la;
int port;
char buf[16];
Channel **dialargs;
Channel **hsargs;
Channel **trargs;
int i;
t->inpeer = chancreate(sizeof (Peer *), 0);
t->timer = chancreate(sizeof (void *), 0);
t->inmsg = chancreate(sizeof (void *), 0);
t->dialpeer = chancreate(sizeof (Peer *), 0);
t->dialedpeer = chancreate(sizeof (Peer *), 0);
t->needshake = chancreate(sizeof (Hsin), 0);
t->shakedpeer = chancreate(sizeof (Peer *), 0);
t->stalepeer = chancreate(sizeof (Stale), 0);
t->track = chancreate(sizeof (Trackreq), 0);
t->prospect = chancreate(sizeof (Peer *), 0);
t->newinterval = chancreate(sizeof (ulong), 0);
t->written = chancreate(sizeof (Msg *), 0);
genrandom(t->peerid, sizeof t->peerid);
t->strategy = Random;
t->dl = 0;
t->ul = 0;
t->rate = ratenew(60);
t->interval = -1;
t->listenport = nil;
t->peers = nil;
t->bad = nil;
t->peern = 0;
t->todial = nil;
t->tohandshake = nil;
t->reqpieces = bitnew(t->npieces, nil);
port = Listenport;
do {
snprint(buf, sizeof buf, "%d", port);
acfd = announce(netmkaddr("*", 0, buf), adir);
} while(acfd < 0 && ++port < Listenport + Listenattempts);
if(acfd < 0)
sysfatal("could not announce: %r");
DEBUG(2, "we have announced...\n");
t->listenport = estrdup(buf);
la = emalloc(sizeof la[0]);
la->adir = estrdup(adir);
la->newpeer = t->dialedpeer;
la->t = t;
proccreate(listener, la, STACKSIZE);
proccreate(timer, t->timer, STACKSIZE);
for(i = 0; i < 1; i++){
dialargs = emalloc(sizeof dialargs[0] * 3);
dialargs[0] = t->dialpeer;
dialargs[1] = t->dialedpeer;
dialargs[2] = t->stalepeer;
proccreate(dialer, dialargs, STACKSIZE);
}
for(i = 0; i < 1; i++){
hsargs = emalloc(sizeof hsargs[0] * 3);
hsargs[0] = t->needshake;
hsargs[1] = t->shakedpeer;
hsargs[2] = t->stalepeer;
proccreate(handshaker, hsargs, STACKSIZE);
}
trargs = emalloc(sizeof trargs[0] * 3);
trargs[0] = t->track;
trargs[1] = t->prospect;
trargs[2] = t->newinterval;
proccreate(tracker, trargs, STACKSIZE);
}
void
trackrequest(Torrent *t, char *event)
{
Trackreq tr;
tr.announce = t->announce;
tr.listenport = t->listenport;
tr.infohash = t->infohash;
tr.peerid = t->peerid;
tr.npieces = t->npieces;
tr.ul = t->ul;
tr.dl = t->dl;
tr.left = t->length - t->stored;
tr.event = event;
send(t->track, &tr);
}
static void
handlemsg(Torrent *t, Peer *p, Msg *m)
{
Bite *b;
Peer *p2;
Piece *pc;
Msg *m2;
Msg **mp;
switch(m->type){
case MKeepalive:
DEBUG(2, "handlemsg: keepalive...\n");
break;
case MChoke:
setischoking(p, 1);
/* all requests we made to peer are now invalid, remove them */
while(b = p->lreq){
/* XXX we may still have this bite requested at another peer */
pc = &t->pieces[b->piecen];
bitunset(pc->reqbites, b->n);
p->lreq = b->next;
bitefree(b);
}
/* also remove requests we were stilll going to send */
mp = &p->meta;
while(m2 = *mp){
if(m2->type == MRequest){
*mp = m2->next;
msgfree(m2);
}
mp = &(*mp)->next;
}
DEBUG(2, "handlemsg: we are now choked by peer=%d\n", p->n);
break;
case MUnchoke:
setischoking(p, 0);
DEBUG(2, "handlemsg: we are now unchoked by peer=%d\n", p->n);
if(IsInteresting(p))
piecepeerschedule(t, p);
break;
case MInterested:
DEBUG(2, "handlemsg: peer=%d is now interested\n", p->n);
setisinterested(p, 1);
if(IsChoked(p) && nchoked(t) < WantUnchokedCount)
unchoke(p);
break;
case MNotinterested:
DEBUG(2, "handlemsg: peer=%d is not interested anymore\n", p->n);
setisinterested(p, 0);
if(!IsChoked(p))
choke(p);
break;
case MHave:
DEBUG(2, "handlemsg: peer=%d now has piece %uld\n", p->n, m->have);
if(m->have >= t->npieces){
DEBUG(2, "handlemsg: peer=%d sent m->have=%uld (>= npieces=%d)\n", p->n, m->have, t->npieces);
break;
}
if(bitget(p->pieces, m->have)){
DEBUG(2, "handlemsg: we already knew peer=%d had piece %uld\n", p->n, m->have);
break;
}
bitset(p->pieces, m->have);
if(!IsInteresting(p) && bitinvertandlen(t->haves, p->pieces) > 0)
interestedin(p);
break;
case MBitfield:
if(p->havefirst){
DEBUG(2, "handlemsg: have bitfield after first message\n");
disconnect(t, p, 1);
}
if(bitnbytes(m->haves) != bitnbytes(p->pieces)){
DEBUG(2, "handlemsg: wrong size of bitfield\n");
disconnect(t, p, 1);
}
bitcopy(p->pieces, m->haves, bitlen(p->pieces));
if(t->strategy == Seeding && bithaveall(p->pieces)){
DEBUG(2, "handlemsg: disconnecting from other seeder\n");
disconnect(t, p, 0);
break;
}
pieceaddpeerhaves(t, p);
DEBUG(2, "handlemsg: first message is bitfield (peer has %.2f%%)\n", 100*bithave(p->pieces));
if(bitinvertandlen(t->haves, p->pieces) > 0)
interestedin(p);
break;
case MRequest:
DEBUG(2, "handlemsg: peer=%d requests piece=%uld begin=%uld length=%uld\n", p->n, m->index, m->begin, m->length);
if(IsChoked(p)){
DEBUG(2, "handlemsg: received request from choked peer, ignoring...\n");
break;
}
/* XXX see that queue isn't growing too large */
b = bitenew(-1, m->index, m->begin, m->length);
biteappend(&p->rreq, b);
kickwriter(p);
break;
case MPiece:
if(m->index >= t->npieces){
DEBUG(2, "handlemsg: peer=%d sent out-of-bound piecen=%uld\n", p->n, m->index);
break;
}
b = bitefind(p->lreq, m->index, m->begin, m->length);
if(b == nil){
DEBUG(2, "handlemsg: incoming piece wasn't requested, ignoring...\n");
break;
}
biteremove(&p->lreq, b);
if(bitehave(t, b)){
DEBUG(2, "handlemsg: already have incoming piece, ignoring...\n");
break;
}
for(p2 = t->peers; p2; p2 = p2->next)
cancel(p2, b);
pc = &t->pieces[b->piecen];
bitunset(pc->reqbites, b->n);
filewrite(t, m->index*t->piecelen + m->begin, m->length, m->piece);
rateadd(p->rate, 0, m->length);
rateadd(t->rate, 0, m->length);
pc = &t->pieces[m->index];
bitset(pc->bites, b->n);
if(bithaveall(pc->bites)){
if(filepiecehashokay(t, pc)){
for(p2 = t->peers; p2; p2 = p2->next){
have(p2, m->index);
if(bitinvertandlen(t->haves, p->pieces) == 0)
notinterestedin(p);
}
bitset(t->haves, pc->n);
t->stored += m->length;
strategy(t);
DEBUG(2, "handlemsg: have valid hash for piecen=%d\n", pc->n);
}else{
bitclear(pc->bites);
bitclear(pc->reqbites);
DEBUG(2, "handlemsg: invalid hash, have to recheck all\n");
}
}
bitefree(b);
piecepeerschedule(t, p);
break;
case MCancel:
DEBUG(2, "handlemsg: peer=%d cancels piece=%uld begin=%uld length=%uld\n", p->n, m->index, m->index, m->length);
b = bitefind(p->rreq, m->index, m->begin, m->length);
if(b == nil){
DEBUG(2, "handlemsg: could not find rreq to cancel\n");
break;
}
biteremove(&p->rreq, b);
bitefree(b);
break;
default:
sysfatal("cannot happen in handlemsg\n");
}
p->havefirst = 1;
}
void
torrentdo(Torrent *t)
{
ulong interval;
Peer *p;
Hsin hi;
Msg *m;
Readerargs *rdargs;
Writerargs *wrargs;
Peer *nc[WantUnchokedCount];
int n;
int i;
ulong now;
Peer *pnew, *pworst;
Stale stale;
Bite *b;
Alt a[] = {
{t->timer, nil, CHANRCV},
{t->inpeer, &p, CHANRCV},
{t->inmsg, &m, CHANRCV},
{t->dialedpeer, &p, CHANRCV},
{t->shakedpeer, &p, CHANRCV},
{t->stalepeer, &stale, CHANRCV},
{t->prospect, &p, CHANRCV},
{t->newinterval, &interval, CHANRCV},
{t->dialpeer, nil, CHANSND},
{t->needshake, nil, CHANSND},
{t->written, &m, CHANRCV},
{nil, nil, CHANEND},
};
strategy(t);
for(;;){
a[8].v = &t->todial;
a[8].op = t->todial ? CHANSND : CHANNOP;
hi.p = t->tohandshake;
hi.infohash = t->infohash;
hi.ourpeerid = t->peerid;
a[9].v = &hi;
a[9].op = hi.p ? CHANSND : CHANNOP;
switch(alt(a)){
case 0: /* timer */
DEBUG(2, "do: timer npeers=%d nchoked=%d\n", peerlen(t->peers), nchoked(t));
now = time(0);
ratetick(t->rate);
for(p = t->peers; p; p = p->next)
ratetick(p->rate);
for(p = t->peers; p; p = p->next)
p->changed = 0;
if(now % 10 == 0){
rethinkunchokes(t, nc, &n);
for(i = 0; i < n; i++)
if(IsChoked(nc[i]))
unchoke(nc[i]);
for(p = t->peers; p; p = p->next)
if(!IsChoked(p) && !p->changed)
choke(p);
}
if(now % 30 == 0){
pnew = randomunchokable(t);
if(pnew){
pworst = worstchokable(t);
if(pworst){
choke(pworst);
unchoke(pnew);
}
}
}
if(now % 60 == 0){
for(p = t->peers; p; p = p->next)
if(!p->changed && p->lastchange < now-60 && ratesum(p->rate, 60, (t->strategy == Seeding) ? Upload : Download) == 0)
choke(p);
}
if(t->interval < 0 || time(0) % t->interval == 0){
trackrequest(t, (t->interval < 0) ? "started" : "");
if(t->interval < 0)
t->interval = DefaultInterval;
}
for(p = t->peers; p; p = p->next)
p->changed = 0;
break;
case 1: /* new peer that connected to us */
if(peermatch(t->peers, p) || peermatchbyip(t->bad, p)){
disconnect(t, p, 0);
break;
}
peeradd(&t->tohandshake, p);
break;
case 2: /* incoming message */
p = peerfind(t->peers, m->peern);
if(p)
handlemsg(t, p, m);
msgfree(m);
break;
case 3: /* dialed peer */
if(t->strategy == Seeding){
disconnect(t, p, 0);
break;
}
peeradd(&t->tohandshake, p);
break;
case 4: /* handshaked peer */
if(peermatch(t->peers, p) || peermatchbyip(t->bad, p)){
disconnect(t, p, 0);
break;
}
p->status &= ~Connecting;
p->status |= Valid;
p->n = t->peern++;
p->next = nil;
peeradd(&t->peers, p);
rdargs = emalloc(sizeof rdargs[0]);
rdargs->out = t->inmsg;
rdargs->stalechan = t->stalepeer;
rdargs->p = p;
rdargs->nbytes = bitnbytes(t->haves);
proccreate(reader, rdargs, STACKSIZE);
wrargs = emalloc(sizeof wrargs[0]);
wrargs->in = p->write;
wrargs->out = t->written;
wrargs->fd = p->fd;
wrargs->stalechan = t->stalepeer;
wrargs->peern = p->n;
proccreate(writer, wrargs, STACKSIZE);
if(bitnhave(t->haves) > 0){
m = msgnew(MBitfield, p->n);
m->haves = bitnew(bitlen(t->haves), nil);
bitcopy(m->haves, t->haves, bitlen(m->haves));
}else{
m = msgnew(MKeepalive, p->n);
}
msgappend(&p->meta, m);
kickwriter(p);
if(IsInterested(p) && IsChoked(p) && nchoked(t) < WantUnchokedCount){
unchoke(p);
}
break;
case 5: /* stale peer */
DEBUG(2, "do: stale peer\n");
if(stale.p == nil)
p = peerfind(t->peers, stale.peern);
if(p == nil)
break;
disconnect(t, p, stale.bad);
break;
case 6: /* new prospect peer */
if(t->strategy == Seeding || peermatch(t->peers, p) || peermatchbyip(t->bad, p)){
disconnect(t, p, 0);
break;
}
DEBUG(2, "do: new prospect peer (%s %s %H)\n", p->ip, p->port, p->peerid);
peeradd(&t->todial, p);
break;
case 7: /* new interval */
t->interval = MAX(MinInterval, interval);
DEBUG(2, "new interval=%d\n", t->interval);
break;
case 8: /* send to dialer */
peerpop(&t->todial);
break;
case 9: /* send to handshaker */
peerpop(&t->tohandshake);
break;
case 10: /* message has been written to peer */
p = peerfind(t->peers, m->peern);
if(p == nil)
break;
if(m->type == MPiece){
rateadd(p->rate, m->length, 0);
rateadd(t->rate, m->length, 0);
}
msgfree(m);
if(p->meta){
sendp(p->write, p->meta);
p->meta = p->meta->next;
}
else if(p->dataout){
sendp(p->write, p->dataout);
p->dataout = p->dataout->next;
}else if(p->rreq){
b = p->rreq;
p->rreq = p->rreq->next;
b->next = nil;
m = msgnew(MPiece, -1);
m->piece = emalloc(b->length);
m->length = b->length;
m->index = b->piecen;
m->begin = b->offset;
fileread(t, b->piecen*t->piecelen + b->n*Bitelength, b->length, m->piece);
bitefree(b);
sendp(p->write, m);
}
break;
default:
sysfatal("invalid case in alt\n");
}
}
}
|