EndGame0/sourcecode/gobalance/pkg/onionbalance/consensus.go
2024-10-23 20:50:14 +05:30

584 lines
15 KiB
Go

package onionbalance
import (
"bufio"
"bytes"
"encoding/base64"
"encoding/hex"
"errors"
"fmt"
"github.com/sirupsen/logrus"
"gobalance/pkg/btime"
"io"
"net"
"regexp"
"strconv"
"strings"
"sync"
"time"
)
type Consensus struct {
Nodes []*TorNode
nodeMtx sync.RWMutex
consensus *ConsensusDoc
controller *Controller
}
func NewConsensus(controller *Controller, doRefreshConsensus bool) *Consensus {
c := &Consensus{}
c.controller = controller
// A list of tor_node:Node objects contained in the current consensus
c.SetNodes(nil)
// A stem NetworkStatusDocumentV3 object representing the current consensus
c.consensus = nil
if !doRefreshConsensus {
return c
}
c.refresh()
return c
}
func (c *Consensus) GetNodes() []*TorNode {
c.nodeMtx.RLock()
defer c.nodeMtx.RUnlock()
return c.Nodes
}
func (c *Consensus) SetNodes(newNodes []*TorNode) {
c.nodeMtx.Lock()
defer c.nodeMtx.Unlock()
c.Nodes = newNodes
}
func (c *Consensus) Consensus() *ConsensusDoc {
return c.consensus
}
// Attempt to refresh the consensus with the latest one available.
func (c *Consensus) refresh() {
mdConsensusStr, err := c.controller.GetMdConsensus()
if err != nil {
logrus.Errorf("Failed to GetMdConsensus: %v", err)
return
}
c.consensus, err = NetworkStatusDocumentV3(mdConsensusStr)
if err != nil {
logrus.Warn("No valid consensus received. Waiting for one...")
return
}
if !c.IsLive() {
logrus.Info("Loaded consensus is not live. Waiting for a live one.")
return
}
c.SetNodes(c.initializeNodes())
}
// IsLive return True if the consensus is live.
// This function replicates the behavior of the little-t-tor
// networkstatus_get_reasonably_live_consensus() function.
func (c *Consensus) IsLive() bool {
if c.consensus == nil {
return false
}
reasonablyLiveTime := 24 * 60 * 60 * time.Second
now := btime.Clock.Now().UTC()
isLive := now.After(c.consensus.ValidAfter.Add(-reasonablyLiveTime)) &&
now.Before(c.consensus.ValidUntil.Add(reasonablyLiveTime))
return isLive
}
func (c *Consensus) initializeNodes() []*TorNode {
nodes := make([]*TorNode, 0)
microdescriptorsList, err := c.controller.GetMicrodescriptors()
if err != nil {
logrus.Warn("Can't get microdescriptors from Tor. Delaying...")
return nodes
}
// Turn the mds into a dictionary indexed by the digest as an
// optimization while matching them with routerstatuses.
microdescriptorsDict := make(map[string]MicroDescriptor)
for _, md := range microdescriptorsList {
microdescriptorsDict[md.Digest()] = md
}
// Go through the routerstatuses and match them up with
// microdescriptors, and create a Node object for each match. If there
// is no match we don't register it as a node.
for _, relayRouterStatusFn := range c.getRouterStatuses() {
relayRouterStatus := relayRouterStatusFn()
logrus.Debugf("Checking routerstatus with md digest %s", relayRouterStatus.Digest)
nodeMicrodescriptor, found := microdescriptorsDict[relayRouterStatus.Digest]
if !found {
logrus.Debugf("Could not find microdesc for rs with fpr %s", relayRouterStatus.Fingerprint)
continue
}
node := NewNode(nodeMicrodescriptor, relayRouterStatus)
nodes = append(nodes, node)
}
return nodes
}
func (c *Consensus) getRouterStatuses() map[Fingerprint]GetStatus {
if !c.IsLive() {
panic("getRouterStatuses and not live")
}
return c.consensus.Routers
}
// NetworkStatusDocumentV3 parse a v3 network status document
func NetworkStatusDocumentV3(mdConsensusStr string) (*ConsensusDoc, error) {
//fmt.Println(mdConsensusStr)
cd := &ConsensusDoc{}
var consensus = NewConsensus1()
var statusParser func(string) (Fingerprint, GetStatus, error)
statusParser = ParseRawStatus
lines1 := strings.Split(mdConsensusStr, "\n")
if len(lines1) < 2 {
// TODO: the following line SOMETIMES returns "panic: runtime error: slice bounds out of range [2:1]" when new consensus is in, not sure why.
logrus.Panic(mdConsensusStr)
}
br := bufio.NewReader(strings.NewReader(strings.Join(lines1[2:], "\n")))
err := extractMetaInfo(br, consensus)
if err != nil {
return nil, fmt.Errorf("metadata info extraction failed: %w", err)
}
queue := make(chan QueueUnit)
go DissectFile(br, extractStatusEntry, queue)
// Parse incoming router statuses until the channel is closed by the remote
// end.
for unit := range queue {
if unit.Err != nil {
return nil, unit.Err
}
fingerprint, getStatus, err := statusParser(unit.Blurb)
if err != nil {
return nil, err
}
consensus.Routers[SanitiseFingerprint(fingerprint)] = getStatus
}
lines := strings.Split(mdConsensusStr, "\n")
for _, line := range lines {
if strings.HasPrefix(line, "valid-after ") {
validAfter := strings.TrimPrefix(line, "valid-after ")
cd.ValidAfter, _ = time.Parse("2006-01-02 15:04:05", validAfter)
} else if strings.HasPrefix(line, "valid-until ") {
validUntil := strings.TrimPrefix(line, "valid-until ")
cd.ValidUntil, _ = time.Parse("2006-01-02 15:04:05", validUntil)
}
}
return consensus, nil
}
// NewConsensus serves as a constructor and returns a pointer to a freshly
// allocated and empty Consensus.
func NewConsensus1() *ConsensusDoc {
return &ConsensusDoc{Routers: make(map[Fingerprint]GetStatus)}
}
// ParseRawStatus parses a raw router status (in string format) and returns the
// router's fingerprint, a function which returns a RouterStatus, and an error
// if there were any during parsing.
func ParseRawStatus(rawStatus string) (Fingerprint, GetStatus, error) {
var status = new(RouterStatus)
lines := strings.Split(rawStatus, "\n")
// Go over raw statuses line by line and extract the fields we are
// interested in.
for _, line := range lines {
words := strings.Split(line, " ")
switch words[0] {
case "r":
status.Nickname = words[1]
fingerprint, err := Base64ToString(words[2])
if err != nil {
return "", nil, err
}
status.Fingerprint = SanitiseFingerprint(Fingerprint(fingerprint))
publish, _ := time.Parse(publishedTimeLayout, strings.Join(words[3:5], " "))
status.Publication = publish
status.Address.IPv4Address = net.ParseIP(words[5])
status.Address.IPv4ORPort = StringToPort(words[6])
status.Address.IPv4DirPort = StringToPort(words[7])
case "a":
status.Address.IPv6Address, status.Address.IPv6ORPort = parseIPv6AddressAndPort(words[1])
case "m":
status.Digest = words[1]
case "s":
status.Flags = *parseRouterFlags(words[1:])
case "v":
status.TorVersion = words[2]
case "w":
bwExpr := words[1]
values := strings.Split(bwExpr, "=")
status.Bandwidth, _ = strconv.ParseUint(values[1], 10, 64)
case "p":
if words[1] == "accept" {
status.Accept = true
} else {
status.Accept = false
}
status.PortList = strings.Join(words[2:], " ")
}
}
return status.Fingerprint, func() *RouterStatus { return status }, nil
}
const (
// The layout of the "published" field.
publishedTimeLayout = "2006-01-02 15:04:05"
)
// SanitiseFingerprint returns a sanitised version of the given fingerprint by
// making it upper case and removing leading and trailing white spaces.
func SanitiseFingerprint(fingerprint Fingerprint) Fingerprint {
sanitised := strings.ToUpper(strings.TrimSpace(string(fingerprint)))
return Fingerprint(sanitised)
}
func parseIPv6AddressAndPort(addressAndPort string) (address net.IP, port uint16) {
var ipV6regex = regexp.MustCompile(`\[(.*?)\]`)
var ipV6portRegex = regexp.MustCompile(`\]:(.*)`)
address = net.ParseIP(ipV6regex.FindStringSubmatch(addressAndPort)[1])
port = StringToPort(ipV6portRegex.FindStringSubmatch(addressAndPort)[1])
return address, port
}
// Convert the given port string to an unsigned 16-bit integer. If the
// conversion fails or the number cannot be represented in 16 bits, 0 is
// returned.
func StringToPort(portStr string) uint16 {
portNum, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
return uint16(0)
}
return uint16(portNum)
}
func parseRouterFlags(flags []string) *RouterFlags {
var routerFlags = new(RouterFlags)
for _, flag := range flags {
switch flag {
case "Authority":
routerFlags.Authority = true
case "BadExit":
routerFlags.BadExit = true
case "Exit":
routerFlags.Exit = true
case "Fast":
routerFlags.Fast = true
case "Guard":
routerFlags.Guard = true
case "HSDir":
routerFlags.HSDir = true
case "Named":
routerFlags.Named = true
case "Stable":
routerFlags.Stable = true
case "Running":
routerFlags.Running = true
case "Unnamed":
routerFlags.Unnamed = true
case "Valid":
routerFlags.Valid = true
case "V2Dir":
routerFlags.V2Dir = true
}
}
return routerFlags
}
// Base64ToString decodes the given Base64-encoded string and returns the resulting string.
// If there are errors during decoding, an error string is returned.
func Base64ToString(encoded string) (string, error) {
// dir-spec.txt says that Base64 padding is removed so we have to account
// for that here.
if rem := len(encoded) % 4; rem != 0 {
encoded += strings.Repeat("=", 4-rem)
}
decoded, err := base64.StdEncoding.DecodeString(encoded)
if err != nil {
return "", err
}
return hex.EncodeToString(decoded), nil
}
type QueueUnit struct {
Blurb string
Err error
}
// Fingerprint represents a relay's fingerprint as 40 hex digits.
type Fingerprint string
type GetStatus func() *RouterStatus
type RouterStatus struct {
// The single fields of an "r" line.
Nickname string
Fingerprint Fingerprint
Digest string
Publication time.Time
// The IPv4 and IPv6 fields of "a" line
Address RouterAddress
// The single fields of an "s" line.
Flags RouterFlags
// The single fields of a "v" line.
TorVersion string
// The single fields of a "w" line.
Bandwidth uint64
Measured uint64
Unmeasured bool
// The single fields of a "p" line.
Accept bool
PortList string
}
type RouterFlags struct {
Authority bool
BadExit bool
Exit bool
Fast bool
Guard bool
HSDir bool
Named bool
Stable bool
Running bool
Unnamed bool
Valid bool
V2Dir bool
}
type RouterAddress struct {
IPv4Address net.IP
IPv4ORPort uint16
IPv4DirPort uint16
IPv6Address net.IP
IPv6ORPort uint16
}
type ConsensusDoc struct {
// Generic map of consensus metadata
MetaInfo map[string][]byte
// Document validity period
ValidAfter time.Time
FreshUntil time.Time
ValidUntil time.Time
// Shared randomness
sharedRandomnessPreviousValue []byte
sharedRandomnessCurrentValue []byte
// A map from relay fingerprint to a function which returns the relay
// status.
Routers map[Fingerprint]GetStatus
// The spread score for HSDIR selection
SpreadScore int
}
// extractMetainfo extracts meta information of the open consensus document
// (such as its validity times) and writes it to the provided consensus struct.
// It assumes that the type annotation has already been read.
func extractMetaInfo(br *bufio.Reader, c *ConsensusDoc) error {
c.MetaInfo = make(map[string][]byte)
// Read the initial metadata. We'll later extract information of particular
// interest by name. The weird Reader loop is because scanner reads too much.
for line, err := br.ReadSlice('\n'); ; line, err = br.ReadSlice('\n') {
if err != nil {
return err
}
// splits to (key, value)
split := bytes.SplitN(line, []byte(" "), 2)
if len(split) != 2 {
return errors.New("malformed metainfo line")
}
key := string(split[0])
logrus.Debug("[Consensus] ", key)
if key == "params" {
splitParams := bytes.SplitAfter(line, []byte(" "))
for _, v := range splitParams {
if bytes.HasPrefix(v, []byte("hsdir_spread_store")) {
splitInnerParams := bytes.SplitN(v, []byte("="), 2)
if len(splitInnerParams) != 2 {
return errors.New("malformed hsdir_spread_store param line! POTENTIAL CONSENSUS COMPROMISE")
}
c.SpreadScore, err = strconv.Atoi(strings.TrimSpace(string(splitInnerParams[1])))
if err != nil {
logrus.Panic("SpreadScore couldn't be parsed as int!", err)
}
p := Params()
if c.SpreadScore != p.HsdirSpreadStore() {
logrus.Debugf("[Consensus] Spread score set to %d", c.SpreadScore)
p.SetHsdirSpreadStore(c.SpreadScore)
}
}
logrus.Debugf("[Consensus][Params] %s", string(v))
}
} else {
c.MetaInfo[key] = bytes.TrimSpace(split[1])
}
// Look ahead to check if we've reached the end of the unique keys.
nextKey, err := br.Peek(11)
if err != nil {
return err
}
if bytes.HasPrefix(nextKey, []byte("dir-source")) || bytes.HasPrefix(nextKey, []byte("fingerprint")) {
break
}
}
var err error
// Define a parser for validity timestamps
parseTime := func(line []byte) (time.Time, error) {
return time.Parse("2006-01-02 15:04:05", string(line))
}
// Extract the validity period of this consensus
c.ValidAfter, err = parseTime(c.MetaInfo["valid-after"])
if err != nil {
return err
}
c.FreshUntil, err = parseTime(c.MetaInfo["fresh-until"])
if err != nil {
return err
}
c.ValidUntil, err = parseTime(c.MetaInfo["valid-until"])
if err != nil {
return err
}
// Reads a shared-rand line from the consensus and returns decoded bytes.
parseRand := func(line []byte) ([]byte, error) {
split := bytes.SplitN(line, []byte(" "), 2)
if len(split) != 2 {
return nil, errors.New("malformed shared random line")
}
// should split to (vote count, b64 bytes)
_, rand := split[0], split[1]
return base64.StdEncoding.DecodeString(string(rand))
}
// Only the newer consensus documents have these values.
if line, ok := c.MetaInfo["shared-rand-previous-value"]; ok {
val, err := parseRand(line)
if err != nil {
return err
}
c.sharedRandomnessPreviousValue = val
}
if line, ok := c.MetaInfo["shared-rand-current-value"]; ok {
val, err := parseRand(line)
if err != nil {
return err
}
c.sharedRandomnessCurrentValue = val
}
return nil
}
// Dissects the given file into string chunks by using the given string
// extraction function. The resulting string chunks are then written to the
// given queue where the receiving end parses them.
func DissectFile(r io.Reader, extractor bufio.SplitFunc, queue chan QueueUnit) {
defer close(queue)
scanner := bufio.NewScanner(r)
scanner.Split(extractor)
for scanner.Scan() {
unit := scanner.Text()
queue <- QueueUnit{unit, nil}
}
if err := scanner.Err(); err != nil {
queue <- QueueUnit{"", err}
}
}
// extractStatusEntry is a bufio.SplitFunc that extracts individual network
// status entries.
func extractStatusEntry(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
start := 0
if !bytes.HasPrefix(data, []byte("r ")) {
start = bytes.Index(data, []byte("\nr "))
if start < 0 {
if atEOF {
return 0, nil, fmt.Errorf("cannot find beginning of status entry: \"\\nr \"")
}
// Request more data.
return 0, nil, nil
}
start++
}
end := bytes.Index(data[start:], []byte("\nr "))
if end >= 0 {
return start + end + 1, data[start : start+end+1], nil
}
end = bytes.Index(data[start:], []byte("directory-signature"))
if end >= 0 {
// "directory-signature" means this is the last status; stop
// scanning.
return start + end, data[start : start+end], bufio.ErrFinalToken
}
if atEOF {
return len(data), data[start:], errors.New("no status entry")
}
// Request more data.
return 0, nil, nil
}