ringbuffer 无锁队列_wear ring

ringbuffer 无锁队列_wear ring最近常收到SOD框架的朋友报告的SOD的SQL日志功能报错:文件句柄丢失。经过分析得知,这些朋友使用SOD框架开发了访问量比较大的系统,由于忘记关闭SQL日志功能所以出现了很高频率的日志写入操作,从而偶然引起错误。后来我建议只记录出错的或者执行时间较长的SQL信息,暂时解决了此问题。但是作为一个热心造轮子的人,一定要看看能不能造一个更好的轮子出来。前面说的错误原因已经很直白了,就是频繁的日志写入导…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE使用 1年只要46元 售后保障 童叟无欺

最近常收到SOD框架的朋友报告的SOD的SQL日志功能报错:文件句柄丢失。经过分析得知,这些朋友使用SOD框架开发了访问量比较大的系统,由于忘记关闭SQL日志功能所以出现了很高频率的日志写入操作,从而偶然引起错误。后来我建议只记录出错的或者执行时间较长的SQL信息,暂时解决了此问题。但是作为一个热心造轮子的人,一定要看看能不能造一个更好的轮子出来。

前面说的错误原因已经很直白了,就是频繁的日志写入导致的,那么解决方案就是将多次写入操作合并成一次写入操作,并且采用异步写入方式。要保存多次操作的内容就要有一个类似“队列”的东西来保存,而一般的线程安全的队列,都是“有锁队列”,在性能要求很高的系统中,不希望在日志记录这个地方耗费多一点计算资源,所以最好有一个“无锁队列”,因此最佳方案就是Ring Buffer(环形缓冲区)了。

什么是Ring Buffer?顾名思义,就是一个内存环,每一次读写操作都循环利用这个内存环,从而避免频繁分配和回收内存,减轻GC压力,同时由于Ring Buffer可以实现为无锁的队列,从而整体上大幅提高系统性能。Ring Buffer的示意图如下,有关具体原理,请参考此文《Ring Buffer 有什么特别?》。

689b5ec31a15e1da15a1e11d2421c827.png

上文并没有详细说明如何具体读写Ring Buffer,但是原理介绍已经足够我们怎么写一个Ring Buffer程序了,接下来看看我在 .NET上的实现。

首先,定一个存放数据的数组,记住一定要用数组,它是实现Ring Buffer的关键并且CPU友好。

const int C_BUFFER_SIZE = 10;//写入次数缓冲区大小,每次的实际内容大小不固定

string[] RingBuffer = new string[C_BUFFER_SIZE];

int writedTimes = 0;

变量writedTimes 记录写入次数,它会一直递增,不过为了线程安全的递增且不使用托管锁,需要使用原子锁Interlocked。之后,根据每次 writedTimes 跟环形缓冲区的大小求余数,得到当前要写入的数组位置:

void SaveFile(string fileName, stringtext)

{int currP= Interlocked.Increment(refwritedTimes);int writeP= currP %C_BUFFER_SIZE ;int index = writeP == 0 ? C_BUFFER_SIZE – 1 : writeP – 1;

RingBuffer[index]= “Arr[” + index + “]:” +text;

}

Ring Buffer的核心代码就这么点,调用此方法,会一直往缓冲区写入数据而不会“溢出”,所以写入Ring Buffer效率很高。

一个队列如果只生产不消费肯定不行的,那么如何及时消费Ring Buffer的数据呢?简单的方案就是当Ring Buffer“写满”的时候一次性将数据“消费”掉。注意这里的“写满”仅仅是指写入位置 index达到了数组最大索引位置,而“消费”也不同于常见的堆栈,队列等数据结构,只是读取缓冲区的数据而不会移除它。

所以前面的代码只需要稍加改造:

void SaveFile(string fileName, stringtext)

{int currP= Interlocked.Increment(refwritedTimes);int writeP= currP %C_BUFFER_SIZE ;int index = writeP == 0 ? C_BUFFER_SIZE – 1 : writeP – 1;

RingBuffer[index]= “Arr[” + index + “]:” +text;if (writeP == 0)

{string result = string.Concat( RingBuffer);

FlushFile(fileName, result);

}

}

writeP == 0 表示当前一轮的缓冲区已经写满,然后调用函数 FlushFile 将Ring Buffer的数据连接起来,整体写入文件。

void FlushFile(string fileName, stringtext)

{using (FileStream fs = new FileStream(fileName, FileMode.Append, FileAccess.Write, FileShare.Write, 2048, FileOptions.Asynchronous))

{byte[] buffer =System.Text.Encoding.UTF8.GetBytes(text);

IAsyncResult writeResult= fs.BeginWrite(buffer, 0, buffer.Length,

(asyncResult)=>{

fs.EndWrite(asyncResult);

},

fs);//fs.EndWrite(writeResult);//这种方法异步起不到效果

fs.Flush();

}

}

在函数 FlushFile 中我们使用了异步写入文件的技术,注意 FileOptions.Asynchronous ,使用它才可以真正利用Windows的完成端口IOCP,将文件异步写入。

当然这段代码也可以使用.NET最新版本支持的 async/await ,不过我要让SOD框架继续支持.NET 2.0,所以只好这样写了。

现在,我们可以开多线程来测试这个循环队列效果怎么样:

Task[] arrTask = new Task[20];for (int i = 0; i < arrTask.Length; i++)

{

arrTask[i]= new Task(obj => SaveFile( (int)obj) ,i);

}for (int i = 0; i < arrTask.Length; i++)

{

arrTask[i].Start();

}

Task.WaitAll(arrTask);

MessageBox.Show(arrTask.Length+”Task All OK.”);

这里开启20个Task任务线程来写入文件,运行此程序,发现20个线程才写入了10条数据,分析很久才发现,文件异步IO太快的话,会有缓冲区丢失,第一次写入的10条数据无法写入文件,多运行几次就没有问题了。所以还是得想法解决此问题。

通常情况下我们都是使用托管锁来解决这种并发问题,但本文的目的就是要实现一个“无锁环形缓冲区”,不能在此“功亏一篑”,所以此时“信号量”上场了。

同步可以分为锁定和信号同步,信号同步机制中涉及的类型都继承自抽象类WaitHandle,这些类型有EventWaitHandle(类型化为AutoResetEvent、ManualResetEvent)、Semaphore以及Mutex。见下图:

b90ce8233de6f0733d8cb0335bd187bd.png

首先声明一个 ManualResetEvent对象:

ManualResetEvent ChangeEvent = new ManualResetEvent(true);

这里我们将 ManualResetEvent 对象设置成 “终止状态”,意味着程序一开始是允许所有线程不等待的,当我们需要消费Ring Buffer的时候再将  ManualResetEvent 设置成“非终止状态”,阻塞其它线程。简单说就是当要写文件的时候将环形缓冲区阻塞,直到文件写完才允许继续写入环形缓冲区。

对应的新的代码调整如下:

void SaveFile(string fileName, stringtext)

{

ChangeEvent.WaitOne();int currP= Interlocked.Increment(refwritedTimes);int writeP= currP %C_BUFFER_SIZE ;int index = writeP == 0 ? C_BUFFER_SIZE – 1 : writeP – 1;

RingBuffer[index]= “Arr[” + index + “]:” +text;if (writeP == 0)

{

ChangeEvent.Reset();string result = string.Concat( RingBuffer);

FlushFile(fileName, result);

}

}

然后,再FlushFile 方法的 回掉方法中,加入设置终止状态的代码,部分代码如下:

(asyncResult) =>{

fs.EndWrite(asyncResult);

ChangeEvent.Set();

}

OK,现在我们的程序具备高性能的安全的写入日志文件的功能了,我们来看看演示程序测试的日志结果实例:

Arr[0]:Thread index:0–FFFFFFF

Arr[1]:Thread index:1–FFFFFFF

Arr[2]:Thread index:8–FFFFFFF

Arr[3]:Thread index:9–FFFFFFF

Arr[4]:Thread index:3–FFFFFFF

Arr[5]:Thread index:2–FFFFFFF

Arr[6]:Thread index:4–FFFFFFF

Arr[7]:Thread index:10–FFFFFFF

Arr[8]:Thread index:5–FFFFFFF

Arr[9]:Thread index:6–FFFFFFF

Arr[0]:Thread index:7–FFFFFFF

Arr[1]:Thread index:11–FFFFFFF

Arr[2]:Thread index:12–FFFFFFF

Arr[3]:Thread index:13–FFFFFFF

Arr[4]:Thread index:14–FFFFFFF

Arr[5]:Thread index:15–FFFFFFF

Arr[6]:Thread index:16–FFFFFFF

Arr[7]:Thread index:17–FFFFFFF

Arr[8]:Thread index:18–FFFFFFF

Arr[9]:Thread index:19–FFFFFFF

测试结果符合预期!

到此,我们今天的主题就全部介绍完成了,不过要让本文的代码能够符合实际的运行,还要解决每次只写入少量数据并且将它定期写入日志文件的问题,这里贴出真正的局部代码:

f9a069730c2849bd7687110b1f55617c.png

PS:有朋友说采用信号量并不能完全保证程序安全,查阅了MSDN也说如果信号量状态改变还没有来得及应用,那么是起不到作用的,所以还需要检查业务状态标记,也就是在设置非终止状态后,马上设置一个操作标记,在其它线程中,需要检查此标记,以避免“漏网之鱼”引起不期望的结果。

再具体实现上,我们可以实现一个“自旋锁”,循环检查此状态标记,为了防止发生死锁,还需要有锁超时机制,代码如下:

void SaveFile(string fileName, stringtext)

{

ChangeEvent.WaitOne(10000);int currP= Interlocked.Increment(refWritedTimes);int writeP= currP %C_BUFFER_SIZE ;int index = writeP == 0 ? C_BUFFER_SIZE – 1 : writeP – 1;if (writeP == 0)

{

ChangeEvent.Reset();

IsReading= true;

RingBuffer[index]= “Arr[” + index + “]:” +text;

LastWriteTime=DateTime.Now;

WritingIndex= 0;

SaveFile(fileName,RingBuffer);

}else if (DateTime.Now.Subtract(LastWriteTime).TotalSeconds >C_WRITE_TIMESPAN)

{

ChangeEvent.Reset();

IsReading= true;

RingBuffer[index]= “Arr[” + index + “]:” +text;int length = index – WritingIndex + 1;if (length <= 0)

length= 1;string[] newArr = new string[length];

Array.Copy(RingBuffer, WritingIndex, newArr,0, length);

LastWriteTime=DateTime.Now;

WritingIndex= index + 1;

SaveFile(fileName, newArr);

}else{//防止漏网之鱼的线程在信号量产生作用之前修改数据//采用“自旋锁”等待

int count = 0;while(IsReading)

{if (count++ > 10000000)

{

Thread.Sleep(50);break;

}

}

RingBuffer[index]= “Arr[” + index + “]:” +text;

}

}

完整的Ring Buffer代码会在最新版本的SOD框架源码中,有关本篇文章测试程序的完整源码,请加QQ群讨论获取,

群号码:SOD框架高级群 18215717 ,加群请注明 PDF.NET技术交流 ,否则可能被拒绝。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/195376.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • vue路由传参的两种方式的区别_vue路由跳转获取参数

    vue路由传参的两种方式的区别_vue路由跳转获取参数vue路由传参的两种方式

    2025年5月23日
    0
  • iscsiadm命令详解_iscsi 局域网

    iscsiadm命令详解_iscsi 局域网启动iscsi守护进程serviceiscsistart发现目标iscsiadm-mdiscovery-tsendtargets-p192.168.1.1:3260-mdiscovery指定模式为discovery-p192.168.1.1:3260指定目标ip和端口登入节点iscsiadm-mnode–Tiqn.19…

    2022年8月23日
    5
  • go语言的type func()用法

    go语言的type func()用法在 go 语言中 type 可以定义任何自定义的类型比如熟悉的 typedogstruc typemyIntint 等等所以 func 也是可以作为类型自定义的 typemyFuncfu int int 意思是自定义了一个叫 myFunc 的函数类型 这个函数的签名必须符合输入为 int 输出为 int 已知 相同底层类型的变量之间是可以相互转换的 例如从一个取值范围小的 int16 转为取值范围大的 int32 所以 自定义的 myInt 和 int 之间也是可以转换的 typemyIn

    2025年6月8日
    0
  • FusionChartsFree的简单用法[通俗易懂]

    FusionChartsFree的简单用法[通俗易懂]  今天发现个不错的显示图表的东西—-FusionChartsFree,有免费版的,有收费版的,免费版的我用着就不错。收费的可能更好一点儿。  看了看官方提供的例子,我是在JSP中使用,想到一个简单的用法,贴在下面:[code="html"]结果显示 FusionCharts. varchart…

    2022年7月13日
    14
  • Mustache 使用心得总结

    Mustache 使用心得总结

    2021年12月15日
    41
  • js模糊查询

    js模糊查询$(function(){var$resourceTitle=$(“#resourceTitle”);$resourceTitle.on(‘keyup’,function(){varresourceTitle=$.trim($resourceTitle.val());query();})functionquery(){if(resourceTitle.length==0){

    2022年5月29日
    46

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号