实验要求
5.2 构建线程A1、A2… Ak(k>=3)和线程B的程序(k生产者和单消费者)。A1、A2… Ak从磁盘各自读取一个文本文件,写入到内存中的固定的容器(如Pool)。A1、A2… Ak读取每一行时,都会休眠,然后在随机的时间(10–100 ms)醒来继续尝试执行。程序要求按照A1、A2… Ak的顺序写入Pool。B会观察Pool的状态,如果有新数据,则进行读取,否则B处于等待状态。注意,A1、A2… Ak不能互相干扰。当所有的文件被读取完毕,且B读取完毕时,程序结束。
不推荐阅读
以下部分内容表述较不准确;由于我个人对多线程掌握得并不是很优秀,代码也不一定完全正确。但作为应付作业已经足够,谨慎阅读 。
实验环境
Visual Studio 2019 + C#开发环境
算法设计
由于k个读取文件的线程内容基本相同,我们便只介绍一个,其他都相同,不再赘述:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public void ReadA1(object stateInfo) { Monitor.Enter(this); if (flag != 1) Monitor.Wait(this); Console.WriteLine("enter 1"); FileStream file = File.OpenRead("text_a1.txt"); StreamReader stream = new StreamReader(file, System.Text.Encoding.Default); file.Seek(0, SeekOrigin.Begin); while (stream.Peek() > -1) { strFromTxt = stream.ReadLine(); Thread.Sleep(r.Next(10, 100)); writeFlag = 1; } Console.WriteLine("quit 1"); flag = 2; Monitor.Pulse(this); Monitor.Exit(this); }
|
首先介绍一下 Monitor.Wait()
与 Monitor.Pulse()
函数:前者用来将当前线程置于等待序列,直到其被移出等待序列后开始执行;而后者则是将当前锁的其他线程移出队列开始执行,发出一段脉冲信号。我们可以用这两个函数加上一个 flag
标记实现线程的异步,达到顺序执行的效果。
由于实验要求中需要我们每读一行字符就写入B中一次,所以我们需要一个标记 writeFlag
来判断是否需要写入。
再来看写入文件的线程B:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public void WriteB(object stateInfo) { FileStream file = File.OpenWrite("text_b.txt"); StreamWriter stream = new StreamWriter(file, System.Text.Encoding.Default); while (true) { if (writeFlag ==1) { stream.WriteLine(strFromTxt); writeFlag = 0; } if (flag == 0) break; } stream.Close(); }
|
由于线程B需要随时运行以进行实时读入文件,其并不需要加锁,可以自由运行。但循环中需要通过 writeFlag
标记来判断是否需要写入。最终当三个线程都读取完后,flag
标记变为0,即退出循环,并关闭文件进行保存。
1 2 3 4 5 6 7 8 9
| public void Start() { ThreadPool.QueueUserWorkItem(new WaitCallback(ReadA1)); ThreadPool.QueueUserWorkItem(new WaitCallback(ReadA2)); ThreadPool.QueueUserWorkItem(new WaitCallback(ReadA3)); ThreadPool.QueueUserWorkItem(new WaitCallback(WriteB)); while (flag != 0) ; Thread.Sleep(r.Next(10, 100)); }
|
而在主函数中,我们则通过线程池直接创建线程,将四个线程都加入线程池,直到 flag = 0
,即线程被运行完后,稍作等待所有操作完成后,退出程序。
完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
| using System; using System.IO; using System.Threading;
namespace ThreadTest { class Program { static void Main(string[] args) { ReadAndWrite readAndWrite = new ReadAndWrite(); readAndWrite.Start(); } } class ReadAndWrite { int flag; string strFromTxt; int writeFlag; Random r = new Random(); public ReadAndWrite() { flag = 1; writeFlag = 0; strFromTxt = ""; } public void ReadA1(object stateInfo) { Monitor.Enter(this); if (flag != 1) Monitor.Wait(this); Console.WriteLine("enter 1"); FileStream file = File.OpenRead("text_a1.txt"); StreamReader stream = new StreamReader(file, System.Text.Encoding.Default); file.Seek(0, SeekOrigin.Begin); while (stream.Peek() > -1) { strFromTxt = stream.ReadLine(); Thread.Sleep(r.Next(10, 100)); writeFlag = 1; } Console.WriteLine("quit 1"); flag = 2; Monitor.Pulse(this); Monitor.Exit(this); } public void ReadA2(object stateInfo) { Monitor.Enter(this); if (flag != 2) Monitor.Wait(this); Console.WriteLine("enter 2"); FileStream file = File.OpenRead("text_a2.txt"); StreamReader stream = new StreamReader(file, System.Text.Encoding.Default); file.Seek(0, SeekOrigin.Begin); while (stream.Peek() > -1) { strFromTxt = stream.ReadLine(); Thread.Sleep(r.Next(10, 100)); writeFlag = 1; } Console.WriteLine("quit 2"); flag = 3; Monitor.Pulse(this); Monitor.Exit(this); } public void ReadA3(object stateInfo) { Monitor.Enter(this); if (flag != 3) Monitor.Wait(this); Console.WriteLine("enter 3"); FileStream file = File.OpenRead("text_a3.txt"); StreamReader stream = new StreamReader(file, System.Text.Encoding.Default); file.Seek(0, SeekOrigin.Begin); while (stream.Peek() > -1) { strFromTxt = stream.ReadLine(); Thread.Sleep(r.Next(10, 100)); writeFlag = 1; } Console.WriteLine("quit 3"); flag = 0; Monitor.Pulse(this); Monitor.Exit(this); } public void WriteB(object stateInfo) { FileStream file = File.OpenWrite("text_b.txt"); StreamWriter stream = new StreamWriter(file, System.Text.Encoding.Default); while (true) { if (writeFlag ==1) { stream.WriteLine(strFromTxt); writeFlag = 0; } if (flag == 0) break; } stream.Close(); } public void Start() { ThreadPool.QueueUserWorkItem(new WaitCallback(ReadA1)); ThreadPool.QueueUserWorkItem(new WaitCallback(ReadA2)); ThreadPool.QueueUserWorkItem(new WaitCallback(ReadA3)); ThreadPool.QueueUserWorkItem(new WaitCallback(WriteB)); while (flag != 0) ; Thread.Sleep(r.Next(10, 100)); } } }
|
测试数据
//text_a1:
1 2 3 4 5 6
| a_line1 a_line2 a_line3 a_line4 a_line5 ----a_end----
|
//text_a2:
1 2 3 4 5
| b_line1 b_line2 b_line3 b_line4 ----b_end----
|
//text_a3:
1 2 3 4
| c_line1 c_line2 c_line3 ----c_end----
|
//text_b(output):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| a_line1 a_line2 a_line3 a_line4 a_line5 ----a_end---- b_line1 b_line2 b_line3 b_line4 ----b_end---- c_line1 c_line2 c_line3 ----c_end----
|