Plan 9 from Bell Labs’s /usr/web/sources/contrib/mjl/wip/deluge/torrent.c

Copyright © 2021 Plan 9 Foundation.
Distributed under the MIT License.
Download the Plan 9 distribution.


#include "deluge.h"


typedef struct Listenargs Listenargs;
typedef struct Readerargs Readerargs;
typedef struct Writerargs Writerargs;
typedef struct Readmsg Readmsg;
typedef struct Hsin Hsin;
typedef struct Trackreq Trackreq;
typedef struct Stale Stale;

struct Listenargs {
	char *adir;
	Channel *newpeer;
	Torrent *t;
};

struct Readerargs {
	Channel *out;
	Channel *stalechan;
	Peer *p;
	int nbytes;
};

struct Writerargs {
	Channel *in;
	Channel *out;
	int fd;
	Channel *stalechan;
	int peern;
};

struct Readmsg {
	Msg *m;
};

struct Hsin {
	Peer *p;
	uchar *infohash;
	uchar *ourpeerid;
};

struct Trackreq {
	char *announce;
	char *listenport;
	uchar *infohash;
	uchar *peerid;
	int npieces;
	vlong ul, dl, left;
	char *event;
};

struct Stale {
	int peern;
	Peer *p;
	int bad;
};


void
torrentinit(Torrent *t, Bee *b, int needopen, int maycreate)
{
	Bee*info;
	int i;
	char *p;
	int plen;
	uchar *pieces;
	vlong pieceslen;
	vlong piecelen;

	t->filecreated = 0;

	ebeedictget(b, "info", TDict, &info, nil);
	beepickle(info, &p, &plen);
	sha1((uchar *)p, plen, t->infohash, nil);
	free(p);

	ebeedictget(b, "announce", TString, &t->announce, nil);
	ebeedictget(info, "piece length", TInteger, &piecelen, nil);
	t->piecelen = (ulong)piecelen;
	ebeedictget(b, "info", TDict, &info, nil);
	fileinit(t, info, needopen, maycreate);
	ebeedictget(info, "pieces", TString, &pieces, &pieceslen);

	if(pieceslen % Piecehashlen != 0)
		sysfatal("string pieces has invalid size (%lld %% %d != 0)", pieceslen, Piecehashlen);
	t->npieces = pieceslen / Piecehashlen;
	DEBUG(2, "have npieces=%d\n", t->npieces);
	t->pieces = emalloc(sizeof t->pieces[0] * t->npieces);
	for(i = 0; i < t->npieces; i++){
		piecelen = t->piecelen;
		if(i == t->npieces - 1)
			piecelen = t->length - ((t->npieces - 1)*t->piecelen);
		pieceinit(&t->pieces[i], i, &pieces[i*Piecehashlen], piecelen);
	}
	t->haves = bitnew(t->npieces, nil);
	t->stored = 0;

	DEBUG(2, "done for torrentinit\n");
}

void
torrentverify(Torrent *t)
{
	int i;

	for(i = 0; i < t->npieces; i++){
		if(filepiecehashokay(t, &t->pieces[i])){
			bitset(t->haves, i);
			t->stored += t->pieces[i].length;
		}
	}
	DEBUG(2, "done for torrentverify\n");
}


int
nchoked(Torrent *t)
{
	int n;
	Peer *p;

	n = 0;
	for(p = t->peers; p; p = p->next)
		if(IsChoked(p))
			n++;
	return n;
}


void
kickwriter(Peer *p)
{
	if(p->meta){
		if(nbsendp(p->write, p->meta) != 0){
			p->meta = p->meta->next;
			return;
		}
	}
	if(p->dataout){
		if(nbsendp(p->write, p->dataout) != 0){
			p->dataout = p->dataout->next;
			return;
		}
	}
}

void
sendstatusmsg(Peer *p, int type)
{
	Msg *m;

	m = msgnew(type, p->n);
	msgappend(&p->meta, m);
	kickwriter(p);
}
	

void
choke(Peer *p)
{
	Bite *b;

	DEBUG(2, "choked peer=%d\n", p->n);
	p->changed = 1;
	setischoked(p, 1);
	p->lastchange = time(0);
	while(p->rreq){
		b = p->rreq;
		p->rreq = b->next;
		bitefree(b);
	}
	sendstatusmsg(p, MChoke);
}


void
unchoke(Peer *p)
{
	DEBUG(2, "unchoked peer=%d\n", p->n);
	p->changed = 1;
	setischoked(p, 0);
	p->lastchange = time(0);
	sendstatusmsg(p, MUnchoke);
}

void
interestedin(Peer *p)
{
	DEBUG(2, "now interested in peer=%d\n", p->n);
	p->changed = 1;
	setisinteresting(p, 1);
	p->lastchange = time(0);
	sendstatusmsg(p, MInterested);
}

void
notinterestedin(Peer *p)
{
	DEBUG(2, "now notinterested in peer=%d\n", p->n);
	p->changed = 1;
	setisinteresting(p, 0);
	p->lastchange = time(0);
	sendstatusmsg(p, MNotinterested);
}

void
cancel(Peer *p, Bite *b)
{
	Msg *m;
	Bite *b2;

	b2 = bitefind(p->lreq, b->piecen, b->offset, b->length);
	if(b2 == nil)
		return;

	biteremove(&p->lreq, b2);
	bitefree(b2);

	for(m = p->meta; m; m = m->next){
		if(m->index == b->piecen && m->begin == b->offset && m->length == b->length){
			msgremove(&p->meta, m);
			msgfree(m);
			DEBUG(2, "cancel: cancelled not-yet-sent request\n");
			return;
		}
	}

	m = msgnew(MCancel, p->n);
	m->index = b->piecen;
	m->begin = b->offset;
	m->length = b->length;
	msgappend(&p->meta, m);
	kickwriter(p);
}

void
request(Torrent *t, Peer *p, Bite *b)
{
	Piece *pc;
	Msg *m;

	pc = &t->pieces[b->piecen];
	bitset(t->reqpieces, b->piecen);
	bitset(pc->reqbites, b->n);

	biteappend(&p->lreq, b);

	m = msgnew(MRequest, p->n);
	m->index = b->piecen;
	m->begin = b->offset;
	m->length = b->length;
	msgappend(&p->meta, m);
	kickwriter(p);
}

void
have(Peer *p, int piecen)
{
	Msg *m;

	m = msgnew(MHave, p->n);
	m->have = piecen;
	msgappend(&p->meta, m);
	kickwriter(p);
}


void
stale(Channel *c, int bad, int peern, Peer *p)
{
	Stale s;

	assert((peern == -1 || p == nil) && (peern != -1 || p != nil));
	s.bad = bad;
	s.p = p;
	s.peern = peern;
	send(c, &s);
}


void
disconnect(Torrent *t, Peer *p, int bad)
{
	Bite *b, *b2;
	Msg *m, *m2;
	Piece *pc;
	int i;

	b = p->lreq;
	while(b){
		pc = &t->pieces[b->piecen];
		bitunset(pc->reqbites, b->n);
		b2 = b;
		b = b->next;
		bitefree(b2);
	}
	while(b){
		b2 = b;
		b = b->next;
		bitefree(b2);
	}
	m = p->meta;
	while(m){
		m2 = m;
		m = m->next;
		msgfree(m2);
	}
	m = p->dataout;
	while(m){
		m2 = m;
		m = m->next;
		msgfree(m2);
	}
	p->lreq = p->rreq = nil;
	p->meta = p->dataout = nil;

	for(i = 0; i < t->npieces; i++)
		if(bitget(p->pieces, i))
			pieceremoveid(&t->pieces[i], p->n);
		
	close(p->fd);
	p->status &= ~(Valid|Connecting);
	peerremove(&t->peers, p);
	if(bad)
		peeradd(&t->bad, p);
	else
		peerfree(p);
}


Peer *
randomunchokable(Torrent *t)
{
	Peer *p;
	Peer **r = nil;
	int rn = 0;

	for(p = t->peers; p; p = p->next){
		if(IsInterested(p) && IsChoked(p) && !p->changed && Stable(p)){
			r = erealloc(r, (rn+1) * sizeof r[0]);
			r[rn++] = p;
		}
	}
	if(rn == 0)
		return nil;
	p = r[rand() % rn];
	free(r);
	return p;
}


Peer *
worstchokable(Torrent *t)
{
	Peer *p, *rp;
	int type;

	type = (t->strategy == Seeding) ? Upload : Download;
	rp = nil;
	for(p = t->peers; p; p = p->next)
		if(!p->changed && !IsChoked(p) && Stable(p) &&
		   (rp == nil || (ratesum(p->rate, 20, type) <  ratesum(rp->rate, 20, type))))
			rp = p;
	return rp;
}


int
cmp(vlong v1, vlong v2)
{
	if(v1 < v2)
		return -1;
	if(v1 > v2)
		return 1;
	return 0;
}


int
deserveunchokecmp(void *pp1, void *pp2, int which)
{
	Peer *p1, *p2;
	vlong v1, v2;

	p1 = *(Peer **)pp1;
	p2 = *(Peer **)pp2;

	if(IsInterested(p1) && !IsInterested(p2))
		return -1;
	if(!IsInterested(p1) && IsInterested(p2))
		return 1;

	v1 = ratesum(p1->rate, 20, which);
	v2 = ratesum(p2->rate, 20, which);
	return cmp(v1, v2);
}



// XXX unchoking in seed mode should be done based on round robin, otherwise a fast leecher dominates a seed.
int
deserveunchokecmp_seeding(void *pp1, void *pp2)
{
	return deserveunchokecmp(pp1, pp2, Upload);
}


int
deserveunchokecmp_notseeding(void *pp1, void *pp2)
{
	return deserveunchokecmp(pp1, pp2, Download);
}


void
rethinkunchokes(Torrent *t, Peer **cp, int *ncp)
{
	Peer **r, **rp, *p;
	int i;

	r = emalloc(peerlen(t->peers) * sizeof r[0]);
	rp = r;
	for(p = t->peers; p; p = p->next)
		*rp++ = p;
	qsort(r, peerlen(t->peers), sizeof (Peer *), t->strategy == Seeding ?  deserveunchokecmp_seeding : deserveunchokecmp_notseeding);
	for(i = 0; i < MIN(peerlen(t->peers), WantUnchokedCount); i++)
		cp[i] = r[i];
	*ncp = i;
	free(r);
}


void
listener(void *ap)
{
	Listenargs *la;
	Channel *newpeer;
	char *adir;
	int lcfd, dfd;
	char ldir[NETPATHLEN];
	NetConnInfo *nci;
	Peer *p;
	Torrent *t;

	la = ap;
	newpeer = la->newpeer;
	adir = la->adir;
	t = la->t;
	free(ap);

	for(;;){
		DEBUG(2, "listening for incoming request\n");

		lcfd = listen(adir, ldir);

		if(lcfd < 0){
			fprint(2, "listen proc exiting...\n");
			return;
		}
		dfd = accept(lcfd, ldir);
		if(dfd < 0){
			DEBUG(2, "accept failed: %r\n");
			continue;
		}

		nci = getnetconninfo(nil, dfd);

		p = peernew(t->npieces);
		p->fd = dfd;
		p->ip = estrdup(nci->rsys);
		p->port = estrdup(nci->rserv);
		sendp(newpeer, p);
		freenetconninfo(nci);

		DEBUG(2, "newpeer has been send\n");
	}
}


void
timer(void *p)
{
	Channel *c;

	c = p;
	for(;;){
		sendp(c, nil);
		sleep(1000);
	}
}

void
reader(void *ap)
{
	Readerargs *rdargs;
	Channel *out;
	Channel *stalechan;
	Msg *m;
	char *errmsg;
	Peer *p;
	char buf[4];
	int len;
	char *msgbuf;
	int nbytes;
	long n;

	rdargs = ap;
	p = rdargs->p;
	out = rdargs->out;
	stalechan = rdargs->stalechan;
	nbytes = rdargs->nbytes;
	free(rdargs);

	for(;;){
		m = emalloc(sizeof m[0]);
		n = readn(p->fd, buf, 4);
		if(n <= 0){
			free(m);
			DEBUG(2, "reader: only read %ld out of 4: %r\n", n);
			stale(stalechan, 0, p->n, nil);
			return;
		}
		len = GET32(buf);
		if(len > msgmaxlen(nbytes)){
			free(m);
			DEBUG(2, "reader: message too long (%d)\n", len);
			stale(stalechan, 0, p->n, nil);
			return;
		}
		msgbuf = emalloc(len);
		n = readn(p->fd, msgbuf, len);
		if(n < 0 || (len > 0 && n == 0)){
			free(m);
			free(msgbuf);
			DEBUG(2, "reader: only read %ld out of %d: %r\n", n, len);
			stale(stalechan, 0, p->n, nil);
			return;
		}

		errmsg = msgparse(msgbuf, len, m, nbytes);
		free(msgbuf);
		if(errmsg){
			DEBUG(2, "reader: parsing message: %s\n", errmsg);
			free(m);
			free(errmsg);
			stale(stalechan, 0, p->n, nil);
			return;
		}
		m->peern = p->n;
		DEBUG(2, "reader: %M\n", m);
		sendp(out, m);
	}
}

void
writer(void *ap)
{
	Writerargs *wrargs;
	Channel *in, *out, *stalechan;
	Msg *m;
	char *errmsg;
	int fd;
	int peern;

	wrargs = ap;
	in = wrargs->in;
	out = wrargs->out;
	stalechan = wrargs->stalechan;
	fd = wrargs->fd;
	peern = wrargs->peern;
	free(wrargs);

	for(;;){
		m = recvp(in);
		errmsg = msgwrite(fd, m);
		if(errmsg){
			DEBUG(2, "writer: error: %s\n", errmsg);
			free(errmsg);
			msgfree(m);
			stale(stalechan, 0, peern, nil);
			return;
		}
		DEBUG(2, "writer: %M\n", m);
		sendp(out, m);
	}
}


void
dialer(void *ap)
{
	Channel *in;
	Channel *out;
	Channel *stalec;
	Channel **chans;
	Peer *p;

	chans = ap;
	in = chans[0];
	out = chans[1];
	stalec = chans[2];
	free(ap);

	for(;;){
		recv(in, &p);
		DEBUG(2, "dialer: connecting to ip=%s port=%s\n", p->ip, p->port);

		p->fd = dial(netmkaddr(p->ip, 0, p->port), 0, 0, 0);
		if(p->fd < 0){
			DEBUG(2, "dialing %s: %r\n", netmkaddr(p->ip, 0, p->port));
			stale(stalec, 1, -1, p);
			continue;
		}
		sendp(out, p);
	}
}


void
handshaker(void *ap)
{
	Channel *in;
	Channel *out;
	Channel *stalec;
	Channel **chans;
	Hsin hi;
	char *errmsg;
	uchar peerid[Peeridlen];
	Peer *p;
	uchar *infohash;
	uchar *ourpeerid;

	chans = ap;
	in = chans[0];
	out = chans[1];
	stalec = chans[2];
	free(ap);

	for(;;){
		recv(in, &hi);

		p = hi.p;
		infohash = hi.infohash;
		ourpeerid = hi.ourpeerid;

		errmsg = peerwritestart(p->fd, infohash, ourpeerid);
		if(errmsg){
			DEBUG(2, "handshaker: %s\n", errmsg);
			free(errmsg);
			stale(stalec, 1, -1, p);
			continue;
		}

		errmsg = peerreadstart(p->fd, infohash, peerid);
		if(errmsg){
			DEBUG(2, "handshaker: reading handshake: %s\n", errmsg);
			free(errmsg);
			stale(stalec, 1, -1, p);
			continue;
		}
		if(memcmp(p->peerid, "\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", Peeridlen) == 0){
			memmove(p->peerid, peerid, Peeridlen);
		}else{
			if(memcmp(p->peerid, peerid, Peeridlen) != 0){
				DEBUG(2, "handshaker: unexpected peerid\n");
				stale(stalec, 1, -1, p);
				continue;
			}
		}

		sendp(out, p);
	}
}


void
tracker(void *ap)
{
	Channel *in;
	Channel *prospect;
	Channel *ival;
	Channel **chans;
	Bee trackerb;
	Bee *tmp, *tmp2;
	Bee *ipb, *portb, *peeridb;
	Trackreq tr;
	ulong interval;
	char *errmsg;
	int i;
	Peer *p;

	chans = ap;
	in = chans[0];
	prospect = chans[1];
	ival = chans[2];
	free(ap);

	for(;;){
		recv(in, &tr);

		DEBUG(2, "calling trackerget\n");
		errmsg = trackerget(&trackerb, tr.announce, tr.listenport, tr.infohash, tr.peerid, tr.ul, tr.dl, tr.left, tr.event);
		if(errmsg){
			DEBUG(2, "trackerget: %s", errmsg);
			continue;
		}

		if(debug)
			beeprint(2, 0, &trackerb);
	
		tmp = beedictget(&trackerb, "interval", TInteger);
		if(tmp == nil){
			DEBUG(2, "missing key interval in initial response from tracker");
			continue;
		}
		interval = (ulong)tmp->i;
	
		tmp = beedictget(&trackerb, "peers", TList);
		if(tmp == nil){
			DEBUG(2, "missing key peers in initial response from tracker");
			continue;
		}
		for(i = 0; i < tmp->nl; i++){
			tmp2 = &tmp->l[i];
			if(tmp2->type != TDict)
				DEBUG(2, "element in peers list is not dict\n");
	
			ipb = beedictget(tmp2, "ip", TString);
			portb = beedictget(tmp2, "port", TInteger);
			peeridb = beedictget(tmp2, "peer id", TString);
	
			if(ipb == nil || portb == nil || peeridb == nil){
				DEBUG(2, "missing key ipb or portb or peeridb");
				continue;
			}

			/* XXX check if lenghts are okay  */

			p = peernew(tr.npieces);
			p->status |= Connecting;
			p->ip = estrdup(ipb->s);
			p->port = smprint("%lld", portb->i);
			memmove(p->peerid, peeridb->s, Peeridlen);

			sendp(prospect, p);
		}
	
		send(ival, &interval);
		beefree(&trackerb);
	}
}


void
torrentprep(Torrent *t)
{
	char adir[NETPATHLEN];
	int acfd;
	Listenargs *la;
	int port;
	char buf[16];
	Channel **dialargs;
	Channel **hsargs;
	Channel **trargs;
	int i;

	t->inpeer = chancreate(sizeof (Peer *), 0);
	t->timer = chancreate(sizeof (void *), 0);
	t->inmsg = chancreate(sizeof (void *), 0);
	t->dialpeer = chancreate(sizeof (Peer *), 0);
	t->dialedpeer = chancreate(sizeof (Peer *), 0);
	t->needshake = chancreate(sizeof (Hsin), 0);
	t->shakedpeer = chancreate(sizeof (Peer *), 0);
	t->stalepeer = chancreate(sizeof (Stale), 0);
	t->track = chancreate(sizeof (Trackreq), 0);
	t->prospect = chancreate(sizeof (Peer *), 0);
	t->newinterval = chancreate(sizeof (ulong), 0);
	t->written = chancreate(sizeof (Msg *), 0);

	genrandom(t->peerid, sizeof t->peerid);

	t->strategy = Random;
	t->dl = 0;
	t->ul = 0;
	t->rate = ratenew(60);
	t->interval = -1;
	t->listenport = nil;
	t->peers = nil;
	t->bad = nil;
	t->peern = 0;
	t->todial = nil;
	t->tohandshake = nil;
	t->reqpieces = bitnew(t->npieces, nil);

	port = Listenport;
	do {
		snprint(buf, sizeof buf, "%d", port);
		acfd = announce(netmkaddr("*", 0, buf), adir);
	} while(acfd < 0 && ++port < Listenport + Listenattempts);
	if(acfd < 0)
		sysfatal("could not announce: %r");
	DEBUG(2, "we have announced...\n");

	t->listenport = estrdup(buf);
	la = emalloc(sizeof la[0]);
	la->adir = estrdup(adir);
	la->newpeer = t->dialedpeer;
	la->t = t;

	proccreate(listener, la, STACKSIZE);
	proccreate(timer, t->timer, STACKSIZE);
	for(i = 0; i < 1; i++){
		dialargs = emalloc(sizeof dialargs[0] * 3);
		dialargs[0] = t->dialpeer;
		dialargs[1] = t->dialedpeer;
		dialargs[2] = t->stalepeer;
		proccreate(dialer, dialargs, STACKSIZE);
	}

	for(i = 0; i < 1; i++){
		hsargs = emalloc(sizeof hsargs[0] * 3);
		hsargs[0] = t->needshake;
		hsargs[1] = t->shakedpeer;
		hsargs[2] = t->stalepeer;
		proccreate(handshaker, hsargs, STACKSIZE);
	}

	trargs = emalloc(sizeof trargs[0] * 3);
	trargs[0] = t->track;
	trargs[1] = t->prospect;
	trargs[2] = t->newinterval;
	proccreate(tracker, trargs, STACKSIZE);
}


void
trackrequest(Torrent *t, char *event)
{
	Trackreq tr;

	tr.announce = t->announce;
	tr.listenport = t->listenport;
	tr.infohash = t->infohash;
	tr.peerid = t->peerid;
	tr.npieces = t->npieces;
	tr.ul = t->ul;
	tr.dl = t->dl;
	tr.left = t->length - t->stored;
	tr.event = event;
	send(t->track, &tr);
}


static void
handlemsg(Torrent *t, Peer *p, Msg *m)
{
	Bite *b;
	Peer *p2;
	Piece *pc;
	Msg *m2;
	Msg **mp;

	switch(m->type){
	case MKeepalive:
		DEBUG(2, "handlemsg: keepalive...\n");
		break;

	case MChoke:
		setischoking(p, 1);

		/* all requests we made to peer are now invalid, remove them */
		while(b = p->lreq){
			/* XXX we may still have this bite requested at another peer */
			pc = &t->pieces[b->piecen];
			bitunset(pc->reqbites, b->n);
			p->lreq = b->next;
			bitefree(b);
		}

		/* also remove requests we were stilll going to send */
		mp = &p->meta;
		while(m2 = *mp){
			if(m2->type == MRequest){
				*mp = m2->next;
				msgfree(m2);
			}
			mp = &(*mp)->next;
		}

		DEBUG(2, "handlemsg: we are now choked by peer=%d\n", p->n);
		break;

	case MUnchoke:
		setischoking(p, 0);
		DEBUG(2, "handlemsg: we are now unchoked by peer=%d\n", p->n);
		if(IsInteresting(p))
			piecepeerschedule(t, p);
		break;

	case MInterested:
		DEBUG(2, "handlemsg: peer=%d is now interested\n", p->n);
		setisinterested(p, 1);
		if(IsChoked(p) && nchoked(t) < WantUnchokedCount)
			unchoke(p);
		break;

	case MNotinterested:
		DEBUG(2, "handlemsg: peer=%d is not interested anymore\n", p->n);
		setisinterested(p, 0);
		if(!IsChoked(p))
			choke(p);
		break;

	case MHave:
		DEBUG(2, "handlemsg: peer=%d now has piece %uld\n", p->n, m->have);
		if(m->have >= t->npieces){
			DEBUG(2, "handlemsg: peer=%d sent m->have=%uld (>=  npieces=%d)\n", p->n, m->have, t->npieces);
			break;
		}
		if(bitget(p->pieces, m->have)){
			DEBUG(2, "handlemsg: we already knew peer=%d had piece %uld\n", p->n, m->have);
			break;
		}

		bitset(p->pieces, m->have);
		if(!IsInteresting(p) && bitinvertandlen(t->haves, p->pieces) > 0)
			interestedin(p);
		break;

	case MBitfield:
		if(p->havefirst){
			DEBUG(2, "handlemsg: have bitfield after first message\n");
			disconnect(t, p, 1);
		}

		if(bitnbytes(m->haves) != bitnbytes(p->pieces)){
			DEBUG(2, "handlemsg: wrong size of bitfield\n");
			disconnect(t, p, 1);
		}
		bitcopy(p->pieces, m->haves, bitlen(p->pieces));
		if(t->strategy == Seeding && bithaveall(p->pieces)){
			DEBUG(2, "handlemsg: disconnecting from other seeder\n");
			disconnect(t, p, 0);
			break;
		}
		pieceaddpeerhaves(t, p);
		DEBUG(2, "handlemsg: first message is bitfield (peer has %.2f%%)\n", 100*bithave(p->pieces));

		if(bitinvertandlen(t->haves, p->pieces) > 0)
			interestedin(p);

		break;

	case MRequest:
		DEBUG(2, "handlemsg: peer=%d requests piece=%uld begin=%uld length=%uld\n", p->n, m->index, m->begin, m->length);
		if(IsChoked(p)){
			DEBUG(2, "handlemsg: received request from choked peer, ignoring...\n");
			break;
		}
		/* XXX see that queue isn't growing too large */
		b = bitenew(-1, m->index, m->begin, m->length);
		biteappend(&p->rreq, b);
		kickwriter(p);
		break;

	case MPiece:
		if(m->index >= t->npieces){
			DEBUG(2, "handlemsg: peer=%d sent out-of-bound piecen=%uld\n", p->n, m->index);
			break;
		}

		b = bitefind(p->lreq, m->index, m->begin, m->length);
		if(b == nil){
			DEBUG(2, "handlemsg: incoming piece wasn't requested, ignoring...\n");
			break;
		}
		biteremove(&p->lreq, b);
		if(bitehave(t, b)){
			DEBUG(2, "handlemsg: already have incoming piece, ignoring...\n");
			break;
		}
		for(p2 = t->peers; p2; p2 = p2->next)
			cancel(p2, b);
		pc = &t->pieces[b->piecen];
		bitunset(pc->reqbites, b->n);

		filewrite(t, m->index*t->piecelen + m->begin, m->length, m->piece);
		rateadd(p->rate, 0, m->length);
		rateadd(t->rate, 0, m->length);

		pc = &t->pieces[m->index];
		bitset(pc->bites, b->n);
		if(bithaveall(pc->bites)){
			if(filepiecehashokay(t, pc)){
				for(p2 = t->peers; p2; p2 = p2->next){
					have(p2, m->index);
					if(bitinvertandlen(t->haves, p->pieces) == 0)
						notinterestedin(p);
				}
				bitset(t->haves, pc->n);
				t->stored += m->length;
				strategy(t);
				DEBUG(2, "handlemsg: have valid hash for piecen=%d\n", pc->n);
			}else{
				bitclear(pc->bites);
				bitclear(pc->reqbites);
				DEBUG(2, "handlemsg: invalid hash, have to recheck all\n");
			}
		}
		bitefree(b);

		piecepeerschedule(t, p);
		break;

	case MCancel:
		DEBUG(2, "handlemsg: peer=%d cancels piece=%uld begin=%uld length=%uld\n", p->n, m->index, m->index, m->length);
		b = bitefind(p->rreq, m->index, m->begin, m->length);
		if(b == nil){
			DEBUG(2, "handlemsg: could not find rreq to cancel\n");
			break;
		}
		biteremove(&p->rreq, b);
		bitefree(b);
		break;

	default:
		sysfatal("cannot happen in handlemsg\n");
	}

	p->havefirst = 1;
}

void
torrentdo(Torrent *t)
{
	ulong interval;
	Peer *p;
	Hsin hi;
	Msg *m;
	Readerargs *rdargs;
	Writerargs *wrargs;
	Peer *nc[WantUnchokedCount];
	int n;
	int i;
	ulong now;
	Peer *pnew, *pworst;
	Stale stale;
	Bite *b;
	Alt a[] = {
		{t->timer,			nil,		CHANRCV},
		{t->inpeer,		&p,		CHANRCV},
		{t->inmsg,		&m,		CHANRCV},
		{t->dialedpeer,		&p,		CHANRCV},
		{t->shakedpeer,	&p,		CHANRCV},
		{t->stalepeer,		&stale,	CHANRCV},
		{t->prospect,		&p,		CHANRCV},
		{t->newinterval,	&interval,	CHANRCV},
		{t->dialpeer,		nil,		CHANSND},
		{t->needshake,		nil,		CHANSND},
		{t->written,		&m,		CHANRCV},
		{nil, nil, CHANEND},
	};

	strategy(t);
	for(;;){
		a[8].v = &t->todial;
		a[8].op = t->todial ? CHANSND : CHANNOP;

		hi.p = t->tohandshake;
		hi.infohash = t->infohash;
		hi.ourpeerid = t->peerid;
		a[9].v = &hi;
		a[9].op = hi.p ? CHANSND : CHANNOP;

		switch(alt(a)){
		case 0:	/* timer */
			DEBUG(2, "do: timer npeers=%d nchoked=%d\n", peerlen(t->peers), nchoked(t));

			now = time(0);
			ratetick(t->rate);
			for(p = t->peers; p; p = p->next)
				ratetick(p->rate);

			for(p = t->peers; p; p = p->next)
				p->changed = 0;

			if(now % 10 == 0){
				rethinkunchokes(t, nc, &n);
				for(i = 0; i < n; i++)
					if(IsChoked(nc[i]))
						unchoke(nc[i]);
				for(p = t->peers; p; p = p->next)
					if(!IsChoked(p) && !p->changed)
						choke(p);
			}
			if(now % 30 == 0){
				pnew = randomunchokable(t);
				if(pnew){
					pworst = worstchokable(t);
					if(pworst){
						choke(pworst);
						unchoke(pnew);
					}
				}
			}
			if(now % 60 == 0){
				for(p = t->peers; p; p = p->next)
					if(!p->changed && p->lastchange < now-60 && ratesum(p->rate, 60, (t->strategy == Seeding) ? Upload : Download) == 0)
						choke(p);
			}
			if(t->interval < 0 || time(0) % t->interval == 0){
				trackrequest(t, (t->interval < 0) ? "started" : "");

				if(t->interval < 0)
					t->interval = DefaultInterval;
			}

			for(p = t->peers; p; p = p->next)
				p->changed = 0;

			break;

		case 1:	/* new peer that connected to us */
			if(peermatch(t->peers, p) || peermatchbyip(t->bad, p)){
				disconnect(t, p, 0);
				break;
			}

			peeradd(&t->tohandshake, p);
			break;

		case 2:	/* incoming message */
			p = peerfind(t->peers, m->peern);
			if(p)
				handlemsg(t, p, m);
			msgfree(m);
			break;

		case 3:	/* dialed peer */
			if(t->strategy == Seeding){
				disconnect(t, p, 0);
				break;
			}
			peeradd(&t->tohandshake, p);
			break;

		case 4:	/* handshaked peer */
			if(peermatch(t->peers, p) || peermatchbyip(t->bad, p)){
				disconnect(t, p, 0);
				break;
			}

			p->status &= ~Connecting;
			p->status |= Valid;
			p->n = t->peern++;
			p->next = nil;
			peeradd(&t->peers, p);

			rdargs = emalloc(sizeof rdargs[0]);
			rdargs->out = t->inmsg;
			rdargs->stalechan = t->stalepeer;
			rdargs->p = p;
			rdargs->nbytes = bitnbytes(t->haves);
			proccreate(reader, rdargs, STACKSIZE);

			wrargs = emalloc(sizeof wrargs[0]);
			wrargs->in = p->write;
			wrargs->out = t->written;
			wrargs->fd = p->fd;
			wrargs->stalechan = t->stalepeer;
			wrargs->peern = p->n;
			proccreate(writer, wrargs, STACKSIZE);

			if(bitnhave(t->haves) > 0){
				m = msgnew(MBitfield, p->n);
				m->haves = bitnew(bitlen(t->haves), nil);
				bitcopy(m->haves, t->haves, bitlen(m->haves));
			}else{
				m = msgnew(MKeepalive, p->n);
			}
			msgappend(&p->meta, m);
			kickwriter(p);

			if(IsInterested(p) && IsChoked(p) && nchoked(t) < WantUnchokedCount){
				unchoke(p);
			}

			break;

		case 5:	/* stale peer */
			DEBUG(2, "do: stale peer\n");

			if(stale.p == nil)
				p = peerfind(t->peers, stale.peern);
			if(p == nil)
				break;

			disconnect(t, p, stale.bad);
			break;

		case 6:	/* new prospect peer */
			if(t->strategy == Seeding || peermatch(t->peers, p) || peermatchbyip(t->bad, p)){
				disconnect(t, p, 0);
				break;
			}

			DEBUG(2, "do: new prospect peer (%s %s %H)\n", p->ip, p->port, p->peerid);
			peeradd(&t->todial, p);
			break;

		case 7:	/* new interval */
			t->interval = MAX(MinInterval, interval);
			DEBUG(2, "new interval=%d\n", t->interval);
			break;

		case 8:	/* send to dialer */
			peerpop(&t->todial);
			break;

		case 9:	/* send to handshaker */
			peerpop(&t->tohandshake);
			break;

		case 10:	/* message has been written to peer */
			p = peerfind(t->peers, m->peern);
			if(p == nil)
				break;

			if(m->type == MPiece){
				rateadd(p->rate, m->length, 0);
				rateadd(t->rate, m->length, 0);
			}
			msgfree(m);

			if(p->meta){
				sendp(p->write, p->meta);
				p->meta = p->meta->next;
			}
			else if(p->dataout){
				sendp(p->write, p->dataout);
				p->dataout = p->dataout->next;
			}else if(p->rreq){
				b = p->rreq;
				p->rreq = p->rreq->next;
				b->next = nil;
				m = msgnew(MPiece, -1);
				m->piece = emalloc(b->length);
				m->length = b->length;
				m->index = b->piecen;
				m->begin = b->offset;
				fileread(t, b->piecen*t->piecelen + b->n*Bitelength, b->length, m->piece);
				bitefree(b);
				sendp(p->write, m);
			}
			break;

		default:
			sysfatal("invalid case in alt\n");
		}
	}
}

Bell Labs OSI certified Powered by Plan 9

(Return to Plan 9 Home Page)

Copyright © 2021 Plan 9 Foundation. All Rights Reserved.
Comments to [email protected].