0

0

这个Golang程序中的同步问题

WBOY

WBOY

发布时间:2024-02-10 09:06:09

|

491人浏览过

|

来源于stackoverflow

转载

这个golang程序中的同步问题

php小编苹果今天为大家介绍一个有趣的话题——"这个Golang程序中的同步问题"。在编写并发程序时,我们经常会遇到同步问题,即多个线程之间的竞争和协调。Golang作为一门并发编程语言,提供了丰富的同步机制和工具,但也存在一些常见的同步问题需要我们注意和解决。本文将详细探讨这些问题,并给出相应的解决方案,帮助大家更好地理解和应对Golang中的同步挑战。让我们一起来探索吧!

问题内容

我正在尝试创建一个充当代理服务器并可以动态切换到新端点的程序。但我遇到一个问题,在调用 switchovertonewendpoint() 后,仍然有一些代理对象连接到原始端点 8.8.8.8 ,应该将其关闭。

package main

import (
    "net"
    "sync"
    "sync/atomic"
    "time"
)

type proxy struct {
    id       int32
    from, to *net.tcpconn
}

var switchover int32 = 0

func setswitchover() {
    atomic.storeint32((*int32)(&switchover), 1)
}

func switchoverenabled() bool {
    return atomic.loadint32((*int32)(&switchover)) == 1
}

var proxies map[int32]*proxy = make(map[int32]*proxy, 0)
var proxyseq int32 = 0
var mu sync.rwmutex

func addproxy(from *net.tcpconn) {
    mu.lock()
    proxyseq += 1
    proxy := &proxy{id: proxyseq, from: from}
    proxies[proxyseq] = proxy
    mu.unlock()

    var toaddr string
    if switchoverenabled() {
        toaddr = "1.1.1.1"
    } else {
        toaddr = "8.8.8.8"
    }
    tcpaddr, _ := net.resolvetcpaddr("tcp4", toaddr)
    toconn, err := net.dialtcp("tcp", nil, tcpaddr)
    if err != nil {
        panic(err)
    }
    proxy.to = toconn
}

func switchovertonewendpoint() {
    mu.rlock()
    closedproxies := proxies
    mu.runlock()

    setswitchover()
    for _, proxy := range closedproxies {
        proxy.from.close()
        proxy.to.close()
        mu.lock()
        delete(proxies, proxy.id)
        mu.unlock()
    }
}

func main() {
    tcpaddr, _ := net.resolvetcpaddr("tcp4", "0.0.0.0:5432")
    ln, _ := net.listentcp("tcp", tcpaddr)
    go func() {
        time.sleep(time.second * 30)
        switchovertonewendpoint()
    }()
    for {
        clientconn, err := ln.accepttcp()
        if err != nil {
            panic(err)
        }
        go addproxy(clientconn)
    }
}

想了一会儿,我猜问题出在

mu.rlock()
    closedproxies := proxies
    mu.runlock()

但我不确定这是否是根本原因,以及是否可以通过将其替换为以下内容来修复它:

立即学习go语言免费学习笔记(深入)”;

closedProxies := make([]*Proxy, 0)
    mu.RLock()
    for _, proxy := range proxies {
        closedProxies = append(closedProxies, proxy)
    }
    mu.RUnlock()

由于该案例很难重现,所以有专业人士可以提供想法或提示吗?欢迎任何评论。提前致谢。

解决方法

问题

改变是必要的。在最初的实现中, latedproxies 持有相同的映射。请参阅此演示:

网格图片手风琴jquery特效代码
网格图片手风琴jquery特效代码

网格图片手风琴jquery特效代码,结合网格手风琴缩略图和手风琴面板的功能,给你展示你的图片网站一个有趣的方法。你可以选择使用XML或HTML。功能强大的API将允许进一步提高这个jQuery插件的功能,可以方便地集成到您自己的应用程序。兼容主流浏览器,php中文网推荐下载! 使用方法: 1、在head区域引入样式表文件style.css和grid-accordion.css 2、在head

下载
package main

import "fmt"

func main() {
    proxies := make(map[int]int, 0)
    for i := 0; i < 10; i++ {
        proxies[i] = i
    }

    closeproxies := proxies

    proxies[10] = 10
    proxies[11] = 11

    for k := range closeproxies {
        delete(proxies, k)
    }

    fmt.printf("items left: %d\n", len(proxies))
    // output:
    //   items left: 0
}

但这不是根本原因。可以在复制 closeproxies 之后但在调用 setswitchover 之前添加新代理。在这种情况下,新代理连接到旧地址,但不在 closeproxies 中。我认为这是根本原因。

还有一个问题。在设置 to 字段之前,将向 proxies 添加新代理。程序可能希望在设置 to 字段之前关闭此代理,从而导致恐慌。

可靠的设计

这个想法是将所有端点放入一个切片中,并让每个端点管理自己的代理列表。所以我们只需要跟踪当前端点的索引。当我们想要切换到另一个端点时,我们只需要更改索引,并告诉过时的端点清除其代理。剩下的唯一复杂的事情是确保过时的端点可以清除其所有代理。请参阅下面的实现:

manager.go

这就是这个想法的实现。

package main

import (
    "sync"
)

// conn is abstraction of a connection to make manager easy to test.
type conn interface {
    close() error
}

// dialer is abstraction of a dialer to make manager easy to test.
type dialer interface {
    dial(addr string) (conn, error)
}

type manager struct {
    // mucurrent protects the "current" member.
    mucurrent sync.rwmutex
    current   int // when current is -1, the manager is shuted down.
    endpoints []*endpoint

    // mu protects the whole switch action.
    mu sync.mutex
}

func newmanager(dialer dialer, addresses ...string) *manager {
    if len(addresses) < 2 {
        panic("a manger should handle at least 2 addresses")
    }

    endpoints := make([]*endpoint, len(addresses))
    for i, addr := range addresses {
        endpoints[i] = &endpoint{
            address: addr,
            dialer:  dialer,
        }
    }
    return &manager{
        endpoints: endpoints,
    }
}

func (m *manager) addproxy(from conn) {
    // 1. addproxy will wait when the write lock of m.mucurrent is taken.
    // once the write lock is released, addproxy will connect to the new endpoint.
    // switch only holds the write lock for a short time, and switch is called
    // not so frequently, so addproxy won't wait too much.
    // 2. switch will wait if there is any addproxy holding the read lock of
    // m.mucurrent. that means switch waits longer. the advantage is that when
    // e.clear is called in switch, all addproxy requests to the old endpoint
    // are done. so it's safe to call e.clear then.
    m.mucurrent.rlock()
    defer m.mucurrent.runlock()

    current := m.current

    // do not accept any new connection when m has been shutdown.
    if current == -1 {
        from.close()
        return
    }

    m.endpoints[current].addproxy(from)
}

func (m *manager) switch() {
    // in a real world, switch is called not so frequently.
    // so it's ok to add a lock here.
    // and it's necessary to make sure the old endpoint is cleared and ready
    // for use in the future.
    m.mu.lock()
    defer m.mu.unlock()

    // take the write lock of m.mucurrent.
    // it waits for all the addproxy requests holding the read lock to finish.
    m.mucurrent.lock()
    old := m.current

    // do nothing when m has been shutdown.
    if old == -1 {
        m.mucurrent.unlock()
        return
    }
    next := old + 1
    if next >= len(m.endpoints) {
        next = 0
    }
    m.current = next
    m.mucurrent.unlock()

    // when it reaches here, all addproxy requests to the old endpoint are done.
    // and it's safe to call e.clear now.
    m.endpoints[old].clear()
}

func (m *manager) shutdown() {
    m.mu.lock()
    defer m.mu.unlock()

    m.mucurrent.lock()
    current := m.current
    m.current = -1
    m.mucurrent.unlock()

    m.endpoints[current].clear()
}

type proxy struct {
    from, to conn
}

type endpoint struct {
    address string
    dialer  dialer

    mu      sync.mutex
    proxies []*proxy
}

func (e *endpoint) clear() {
    for _, p := range e.proxies {
        p.from.close()
        p.to.close()
    }

    // assign a new slice to e.proxies, and the gc will collect the old one.
    e.proxies = []*proxy{}
}

func (e *endpoint) addproxy(from conn) {
    toconn, err := e.dialer.dial(e.address)
    if err != nil {
        // close the from connection so that the client will reconnect?
        from.close()
        return
    }

    e.mu.lock()
    defer e.mu.unlock()
    e.proxies = append(e.proxies, &proxy{from: from, to: toconn})
}

main.go

这个演示展示了如何使用之前实现的manager类型:

package main

import (
    "net"
    "time"
)

type realdialer struct{}

func (d realdialer) dial(addr string) (conn, error) {
    tcpaddr, err := net.resolvetcpaddr("tcp4", addr)
    if err != nil {
        return nil, err
    }
    return net.dialtcp("tcp", nil, tcpaddr)
}

func main() {
    manager := newmanager(realdialer{}, "1.1.1.1", "8.8.8.8")

    tcpaddr, _ := net.resolvetcpaddr("tcp4", "0.0.0.0:5432")
    ln, _ := net.listentcp("tcp", tcpaddr)

    go func() {
        for range time.tick(30 * time.second) {
            manager.switch()
        }
    }()
    for {
        clientconn, err := ln.accepttcp()
        if err != nil {
            panic(err)
        }
        go manager.addproxy(clientconn)
    }
}

manager_test.go

使用以下命令运行测试:go test ./... -race -count 10

package main

import (
    "errors"
    "math/rand"
    "sync"
    "sync/atomic"
    "testing"
    "time"

    "github.com/google/uuid"
)

func TestManager(t *testing.T) {
    addresses := []string{"1.1.1.1", "8.8.8.8"}
    dialer := newDialer(addresses...)
    manager := NewManager(dialer, addresses...)

    ch := make(chan int, 1)

    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        for range ch {
            manager.Switch()
        }
        wg.Done()
    }()

    count := 1000
    total := count * 10

    wg.Add(total)

    fromConn := &fakeFromConn{}
    for i := 0; i < total; i++ {
        if i%count == count-1 {
            ch <- 0
        }
        go func() {
            manager.AddProxy(fromConn)
            wg.Done()
        }()
    }
    close(ch)

    wg.Wait()

    manager.Shutdown()

    for _, s := range dialer.servers {
        left := len(s.conns)
        if left != 0 {
            t.Errorf("server %s, unexpected connections left: %d", s.addr, left)
        }
    }

    closedCount := fromConn.closedCount.Load()

    if closedCount != int32(total) {
        t.Errorf("want closed count: %d, got: %d", total, closedCount)
    }
}

type fakeFromConn struct {
    closedCount atomic.Int32
}

func (c *fakeFromConn) Close() error {
    c.closedCount.Add(1)

    return nil
}

type fakeToConn struct {
    id     uuid.UUID
    server *fakeServer
}

func (c *fakeToConn) Close() error {
    if c.id == uuid.Nil {
        return nil
    }

    c.server.removeConn(c.id)

    return nil
}

type fakeServer struct {
    addr  string
    mu    sync.Mutex
    conns map[uuid.UUID]bool
}

func (s *fakeServer) addConn() (uuid.UUID, error) {
    s.mu.Lock()
    defer s.mu.Unlock()

    id, err := uuid.NewRandom()
    if err == nil {
        s.conns[id] = true
    }
    return id, err
}

func (s *fakeServer) removeConn(id uuid.UUID) {
    s.mu.Lock()
    defer s.mu.Unlock()

    delete(s.conns, id)
}

type fakeDialer struct {
    servers map[string]*fakeServer
}

func newDialer(addresses ...string) *fakeDialer {
    servers := make(map[string]*fakeServer)
    for _, addr := range addresses {
        servers[addr] = &fakeServer{
            addr:  addr,
            conns: make(map[uuid.UUID]bool),
        }
    }
    return &fakeDialer{
        servers: servers,
    }
}

func (d *fakeDialer) Dial(addr string) (Conn, error) {
    n := rand.Intn(100)
    if n == 0 {
        return nil, errors.New("fake network error")
    }
    // Simulate network latency.
    time.Sleep(time.Duration(n) * time.Millisecond)

    s := d.servers[addr]
    id, err := s.addConn()
    if err != nil {
        return nil, err
    }
    conn := &fakeToConn{
        id:     id,
        server: s,
    }
    return conn, nil
}

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
golang如何定义变量
golang如何定义变量

golang定义变量的方法:1、声明变量并赋予初始值“var age int =值”;2、声明变量但不赋初始值“var age int”;3、使用短变量声明“age :=值”等等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

182

2024.02.23

golang有哪些数据转换方法
golang有哪些数据转换方法

golang数据转换方法:1、类型转换操作符;2、类型断言;3、字符串和数字之间的转换;4、JSON序列化和反序列化;5、使用标准库进行数据转换;6、使用第三方库进行数据转换;7、自定义数据转换函数。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

229

2024.02.23

golang常用库有哪些
golang常用库有哪些

golang常用库有:1、标准库;2、字符串处理库;3、网络库;4、加密库;5、压缩库;6、xml和json解析库;7、日期和时间库;8、数据库操作库;9、文件操作库;10、图像处理库。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

343

2024.02.23

golang和python的区别是什么
golang和python的区别是什么

golang和python的区别是:1、golang是一种编译型语言,而python是一种解释型语言;2、golang天生支持并发编程,而python对并发与并行的支持相对较弱等等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

210

2024.03.05

golang是免费的吗
golang是免费的吗

golang是免费的。golang是google开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的开源编程语言,采用bsd开源协议。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

396

2024.05.21

golang结构体相关大全
golang结构体相关大全

本专题整合了golang结构体相关大全,想了解更多内容,请阅读专题下面的文章。

240

2025.06.09

golang相关判断方法
golang相关判断方法

本专题整合了golang相关判断方法,想了解更详细的相关内容,请阅读下面的文章。

193

2025.06.10

golang数组使用方法
golang数组使用方法

本专题整合了golang数组用法,想了解更多的相关内容,请阅读专题下面的文章。

438

2025.06.17

java入门学习合集
java入门学习合集

本专题整合了java入门学习指南、初学者项目实战、入门到精通等等内容,阅读专题下面的文章了解更多详细学习方法。

1

2026.01.29

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
C++教程
C++教程

共115课时 | 14.7万人学习

【web前端】Node.js快速入门
【web前端】Node.js快速入门

共16课时 | 2万人学习

php-src源码分析探索
php-src源码分析探索

共6课时 | 0.5万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号