跳过正文
  1. Posts/

公平竞争 Chan 模型

·751 字·2 分钟
技术 Architecture
ub(s)erate
作者
ub(s)erate

源自这个文章:

读《Your biggest customer might be your biggest bottleneck》
·158 字·1 分钟
阅读 短篇阅读

仓库:

公平竞争模型在 Golang 中似乎非常好模拟.

我们先来看一下 golang 中如何实现一个基础的 FIFO 模型.

消息
#

整个模拟我希望可以简单一点, 所以我们简化一下消息内容:

type Message struct{
    Tenant string // 租户信息
    Message int   // 消息内容
}

队列
#

https://github.com/Uberate/fair_go_chan_mock/blob/main/type.go

package main

import "errors"

type Queue interface {
	Next() (tenant string, message int)
	Put(tenant string, message int) error
	IsEmpty() bool
}

var ChanWasFull = errors.New("channel full")

type Message struct {
	tenant  string
	message int // use in represent the message info and time index.
}

func Generator() func(tenant string) Message {
	m := map[string]int{}

	return func(tenant string) Message {
		if _, ok := m[tenant]; !ok {
			m[tenant] = 0
		}

		m[tenant]++
		return Message{tenant, m[tenant]}
	}
}

FIFO
#

这个对于 Golang 来说非常简单,直接用一个 chan 即可

https://github.com/Uberate/fair_go_chan_mock/blob/main/fifo.go

package main

func NewFifoChan(bufferSize int) Queue {
	return &FifoChan{
		c: make(chan Message, bufferSize),
	}
}

type FifoChan struct {
	c chan Message
}

func (fcr *FifoChan) Next() (tenant string, obj int) {
	v := <-fcr.c
	return v.tenant, v.message
}

func (fcr *FifoChan) Put(tenant string, obj int) error {
	m := Message{tenant, obj}
	select {
	case fcr.c <- m:
	default:
		return ChanWasFull
	}
	return nil
}

func (fcr *FifoChan) IsEmpty() bool {
	return len(fcr.c) == 0
}

Fair 模型
#

这个对于 golang 来说,也不是特别困难,这里直接给每个租户都添加了一个 Chan 。

https://github.com/Uberate/fair_go_chan_mock/blob/main/fair.go

package main

import (
	"sync"
)

type FairChan struct {
	bufferSize int
	channels   map[string]chan Message
	tenants    []string
	index      int
	mu         sync.Mutex
}

func NewFairChan(bufferSize int) Queue {
	return &FairChan{
		bufferSize: bufferSize,
		channels:   make(map[string]chan Message),
		tenants:    make([]string, 0),
		index:      0,
	}
}

func (fc *FairChan) Put(tenant string, message int) error {
	fc.mu.Lock()
	if _, exists := fc.channels[tenant]; !exists {
		fc.channels[tenant] = make(chan Message, fc.bufferSize)
		fc.tenants = append(fc.tenants, tenant)
	}
	fc.mu.Unlock()

	msg := Message{tenant, message}
	select {
	case fc.channels[tenant] <- msg:
		return nil
	default:
		return ChanWasFull
	}
}

func (fc *FairChan) Next() (string, int) {
	fc.mu.Lock()
	defer fc.mu.Unlock()

	if len(fc.tenants) == 0 {
		return "", 0
	}

	// Round-robin through tenants
	for i := 0; i < len(fc.tenants); i++ {
		tenant := fc.tenants[fc.index]

		// Try to get message from current tenant's channel
		select {
		case msg := <-fc.channels[tenant]:
			fc.index = (fc.index + 1) % len(fc.tenants)
			return msg.tenant, msg.message
		default:
			// No message from this tenant, move to next
			fc.index = (fc.index + 1) % len(fc.tenants)
		}
	}

	// No messages available from any tenant
	return "", 0
}

func (fc *FairChan) IsEmpty() bool {
	fc.mu.Lock()
	defer fc.mu.Unlock()

	if len(fc.tenants) == 0 {
		return true
	}

	// Check if all tenant channels are empty
	for _, ch := range fc.channels {
		if len(ch) > 0 {
			return false
		}
	}
	return true
}

Mock
#

然后我们就来模拟一下吧~

我们构造这个输入:

t1: +++++.++++++++++++....+++++++++++++
t2: +...+.............+...+............
t3: ....++............+++++............

t1、t2、t3 表示三个不同的租户。上面的代码段中,+. 分别表示输入的状态。

然后我们运行后,得到了如下两个输出结果

FIFO:

t1: ++.+++...++++++++++++.....+..++++++++++++
t2: ..+...+.................+..+.............
t3: .......++............+++.+..+............

Fair:

t1: +.+..+..+..+.+.+.+.++++++++++++++++++++++
t2: .+.+..+..+...............................
t3: ....+..+..+.+.+.+.+......................

可以看到 FIFO 的 t2、t3 终止在相对较后的位置,但是在 Fair 的模式下,较早的结束了。

事实证明,通过 Fair 的逻辑可以解决邻居吵闹的模型。