-
Notifications
You must be signed in to change notification settings - Fork 891
/
Copy path持续集成系统.html
586 lines (436 loc) · 58.3 KB
/
持续集成系统.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
<html lang="zh-Hans" dir="ltr">
<head>
<meta charset="utf-8"></meta>
<meta name="viewport" content="width=device-width, initial-scale=1.0"></meta>
<meta name="provenance" content="$Id: index.html 1472 2012-09-21 22:17:41Z audrey $"></meta>
<link rel="stylesheet" href="http://aosabook.org/en/500L/theme/css/bootstrap.css" type="text/css"></link>
<link rel="stylesheet" href="http://aosabook.org/en/500L/theme/css/bootstrap-responsive.css" type="text/css"></link>
<link rel="stylesheet" href="http://aosabook.org/en/500L/theme/css/code.css" type="text/css"></link>
<link rel="stylesheet" href="http://aosabook.org/en/500L/theme/css/500L.css" type="text/css"></link>
<title>500 Lines or Less |持续集成系统</title>
<script type="text/x-mathjax-config">
MathJax.Hub.Config({
tex2jax: {
inlineMath: [['$','$'], ['\\(','\\)']],
displayMath: [ ['$$','$$'], ["\\[","\\]"] ],
},
});
</script>
<script type="text/javascript" src="http://cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML">
</script>
</head>
<body dir="ltr">
<div class="container">
<div class="row">
<div class="hero-unit">
<a class="pull-right" href="http://aosabook.org/en/index.html"></a>
<h1>持续集成系统</h1>
<h2 class="author">Malini Das</h2>
<blockquote class="pull-right">
</blockquote>
</div>
</div>
<div class="row">
<div class="span10 offset1" id="content">
<p><em>Malini Das 是一个致力于改善编码速度(当然是在保证代码安全的前提下),并不断寻求交叉编程的解决方案的软件工程师。她曾以工具工程师的身份供职于Mozilla,现在她在Twitch工作。可以通过关注Malini的<a href="https://twitter.com/malinidas">Twitter</a>或是她的<a href="http://malinidas.com/">博客</a>来了解她的最新动态。</em></p>
<h2 id="what-is-a-continuous-integration-system">什么是持续集成系统?</h2>
<p>在软件开发的过程中,我们希望能保证每一个新功能都能稳定的实现,每一个Bug都能按照预期得到修复。通常来讲这种方式就是对代码进行测试。多数情况下,开发人员会在开发环境中直接进行测试来确保功能实现的完整和稳定,很少有人会有时间在每一种可能的运行环境中都进行测试。此外,随着开发的不断进行,需要进行的测试不断的增加,在开发环境中对代码进行完全的测试的可行性也随之变得越来越低。持续集成系统的出现,正是为了解决这种开发中的困境。</p>
<p>持续集成(CI)系统是专门用来对新代码进行测试的系统。持续集成系统的职责就是在每次一段新的代码被提交时,确保这些新代码不会导致之前测试样例的失败。为此,持续集成系统就必须做到自动获取到最新的代码改动,自动完成测试,并生成测试报告。同时,持续集成系统还需要保证良好的稳定性。也就是说,当系统的任何一部分出现错误甚至崩溃时,整个系统应该可以从上一次中断的地方重新恢复运行。</p>
<p>这个系统同样需要均衡负载的能力,这样一来当提交新版本的时间比运行测试的时间还要短的时候,我们仍然可以保证在一个合理的时间内获得测试的结果。这一点我们可以通过多节点任务分发及并行化运行测试样例来实现。本项目中将介绍一个小型可拓展的极简分布式持续集成系统。</p>
<h2 id="project-limitations-and-notes">注意事项及相关说明</h2>
<p>在本项目中使用Git作为进行测试的代码托管系统。我们只会调用标准的代码管理指令,所以,如果你并不熟悉Git的操作,但对于使用其他像svn或者Mercurial这样的版本控制系统(VCS)很熟悉,那么你完全可以基于其他的版本控制系统来完成我们的教程。</p>
<p>由于代码长度和单元测试的限制,我简化了测试样例搜索的机制。我们将<em>仅</em>运行代码库中<code>tests</code>目录下的测试样例。</p>
<p>通常来讲,持续集成系统监听的应该是远程代码托管库的变化。但是为了方便起见,在我们的示例之中,我们选择监听本地的代码库文件来代替远程文件。</p>
<p>持续集成系统并不一定按照固定的时间表运行。你也可以设定成每一次或几次提交时自动运行。在我们的例子中,我们将CI设定为定期运行。也就是说,如果我们设定CI系统设定为5秒钟运行一次,那么每隔5秒系统就会对5秒内最近的一次的提交进行测试。注意,不论这5秒内发生了多少次提交,系统都只会对最后一次提交的结果进行一次测试。</p>
<p>CI系统旨在监听代码库中的变化。在实际中使用的CI系统可以通过代码库的通知来获取提交信息。例如,Github提供的“post-commit hooks”。它向指定的URL发送通知, 监听这个URL的Web服务器将调用CI系统的“代码库观测器”来响应该通知。但是这种模型在我们本地的试验环境中太过复杂了,所以我们使用了观察者模型。在这种模型中系统会主动检测代码变化,而不需要等待代码管理库的通知。</p>
<p>为了可以查看测试运行器所提交的测试结果,CI系统还需要一个报告组件(比如一个网页),来展示测试报告。为简单起见,本项目直接将测试结果以文件的形式存储在调度程序运行的机器本地。</p>
<p>注意,CI系统的框架有非常多种,在我们的项目中,只是讨论了众多CI系统框架中的一种。在这种框架中,我们将我们的项目简化成了三个主要组成部分。</p>
<h2 id="introduction">引言</h2>
<p>最基础的持续集成系统分为三个部分:监听器,测试样例调度器,和测试运行器首先监听器会时刻监视代码库的变化。当发生提交时,监听器会通知调度器。之后,样例调度器会将对应提交版本号的测试分配到测试运行器中去完成</p>
<p>这三部分的组合方式有很多。我们可以将他们全部运行在一台电脑的同一个线程之中。但是这样一来,我们的CI系统就会缺少了处理大负载的能力,当提交带来的测试比CI系统能够同时完成的测试还要多时,就会非常容易引起任务的积压。同时这种方案的容错率非常低,一旦运行该系统的计算机发生故障或是断电,没有回退系统,中断的测试就没有办法重新运行了。理想情况下,我们的CI系统应该根据需求尽可能同时的完成多项测试工作,并且在机器发生意外停机时可以尽可能的补救。</p>
<p>要构建一个负载能力强并且容错率又高的CI系统,就需要保证项目中上述的每一个组件都能以独立的进程运行。各部分都独立于其他的部分,并且每一部分可以同时运行多个实例。当有很多测试任务需要同时展开时,这种方案会带来非常大的便利。我们可以并行生成多个测试运行器,每个测试运行器独立工作,这样就可以有效的避免测试队列积压的问题。</p>
<p>在本项目中这些组件虽然是相互独立的运行在单独的线程上,但是线程之间可以通过套接字进行通信,这样我们就可以在一个网络内的不同主机上单独运行这些进程。我们会为每一个进程分配一个地址/端口,这样每个进程之间就可以通过向各自的地址发送消息来互相通信。</p>
<p>通过分布式的架构,我们可以做到在硬件发生错误动态的进行处理。我们可以把监听器,测样例调度器,和测试运行器分别运行在不同的机器上,他们可以通过网络保持相互通信。当他们之中的任何一个发生问题时,我们可以安排一台新的主机上线运行发生问题的进程。这样一来这个系统就会有非常高的容错率。</p>
<p>在本项目中,并没有包含自动恢复任务的代码。自动恢复的功能依赖于你使用的分布式系统的架构。在实际的使用中,CI系统通常运行在支持故障转移冗余(即,当分布式系统中的一个机器发生故障,任务会自动回退到该机型的备用机上)的分布式系统之中。</p>
<p>为了方便测试我们的系统,在本项目中我们将会在本地手动的触发一些进程来模拟分布式的环境。</p>
<h3 id="files-in-this-project">项目的文件结构</h3>
<p>项目中每个组件的Python文件结构如下:代码库监听器( <code>repo_observer.py</code> ),测试样例调度器( <code>dispatcher.py</code> )和测试运行器( <code>test_runner.py</code> )。上述每个线程之间通过套接字通信,我们将用于实现通信功能的代码统一的放在 helpers.py 中。这样就可以让每个组件直接从这个文件中导入相关功能,而不用再每个组件中重复的写这段代码。</p>
<p>另外,我们还用到了bash脚本。这些脚本用来执行一些简单的bash和git的操作,直接通过bash脚本要比利用Python提供的系统级别的模块(比如,os或者subprocess之类的)要更方便一些。</p>
<p>最后,我们还建立了一个tests目录来存放我们需要CI系统运行的测试样例。在这个目录中包含两个用于测试的样例,其中一个样例模拟了样例通过时的情况,另一个则模拟了失败时的情况。</p>
<h3 id="initial-setup">初始设置</h3>
<p>虽然我们的CI系统是为分布式的运行而设计的,但我们会先在一台计算机上本地运行所有内容,这样我们就可以专注于掌握CI系统的工作原理,而不必受可能出现的各种网络问题的干扰。当然,如果你想要试一试分布式的运行环境,你也可以将每一个组件分别运行到不同的主机上。</p>
<p>持续集成系统通过监听代码的变动来触发测试,所以在开始之前我们需要设置一个用于监听的代码库。</p>
<p>我们称这个用于测试的项目为<code>test_repo</code> :</p>
<pre class="sourceCode bash"><code class="sourceCode bash">$ <span class="kw">mkdir</span> test_repo
$ <span class="kw">cd</span> test_repo
$ <span class="kw">git</span> init</code></pre>
<p>这将是我们的代码库。开发人员的代码会提交到这个代码库中,我们的CI系统会拉取此代码库的代码并检查提交,然后运行测试。在我们的系统中代码库监听器模块会完成监听新的代码提交的工作。</p>
<p>监听器模块通过检查commit(提交)来进行代码更新的监听,所以我们至少需要一次的commit才能进行监听器模块的测试。让我们把测试样例的代码先提交到代码库中,以便我们执行这些测试。</p>
<p>将tests文件夹复制到<code>test_repo</code>中,然后提交:</p>
<pre class="sourceCode bash"><code class="sourceCode bash">$ <span class="kw">cp</span> -r /this/directory/tests /path/to/test_repo/
$ <span class="kw">cd</span> /path/to/test\_repo
$ <span class="kw">git</span> add tests/
$ <span class="kw">git</span> commit -m ”add tests”</code></pre>
<p>现在,在我们测试用的代码仓库中的master分支上有了一次可以用来测试的提交。</p>
<p>监听器模块需要一份单独的代码副本来检测新的提交。让我们为主代码库创建一个副本,并将其命名为<code>test_repo_clone_obs</code> :</p>
<pre class="sourceCode bash"><code class="sourceCode bash">$ <span class="kw">git</span> clone /path/to/test_repo test_repo_clone_obs</code></pre>
<p>测试运行器也需要一份自己的代码副本来提取用于测试样例代码。让我们再创建另一个主代码库的副本,并将其命名为<code>test_repo_clone_runner</code> :</p>
<pre class="sourceCode bash"><code class="sourceCode bash">$ <span class="kw">git</span> clone /path/to/test_repo test_repo_clone_runner</code></pre>
<h2 id="the-components">组件</h2>
<h3 id="the-repository-observer-repo_observer.py">代码库监听器( <code>repo_observer.py</code> )</h3>
<p>监听器的任务是监听代码库中的改动,并在发现改动是通知测试样例分配器。为了保证我们的CI系统与各种版本控制系统(并不是所有的VCS都有内置的通知系统)都能够兼容,我们设定CI系统定时检查代码库是否有新的提交,而不是等待VCS在代码提交时发送通知。</p>
<p>监听器会定时轮询代码库,当有新的提交时,监听器会向分配器推送需要运行测试的代码的版本ID。监听器的轮询过程是:首先,在监听器的储存空间中得到当前的提交版本;其次,将本地库更新至这个版本;最后,将这个版本与远程库最近一次的提交ID进行比对。这样,监听器中本地的当前版本与远程的最新版本不一致时就判定为发生了新的提交。在我们的CI系统中,监听器只会向分配器推送最近的一次提交。这意味着,如果在一次的轮询周期内发生了两次提交,监听器只会为最近的一次运行测试。通常来讲,CI系统会为自上一次更新以来的每一次的提交运行相应的测试。但是为了简单起见,这次我们搭建的CI系统采取了仅为最后一次提交运行测试的方案。</p>
<p>监听器必须清楚自己监听的到底是哪一个代码库,我们之前已经在<code>/path/to/test_repo_clone_obs</code>建立了一份用于监听的代码拷贝。我们的监听器会使用这份拷贝进行检测。为了监听器能够找到这份拷贝,我们在调用<code>repo_observer.py</code>时会传入这一份代码拷贝的路径。监听器会以这份拷贝为本地代码库从主仓库中拉取最新的代码。</p>
<p>监听器会利用这份拷贝从主仓库中拉取最新的代码。在运行监听器时,可以通过命令行参数<code>--dispatcher-server</code>来传递分配器的地址。如果不手动传入地址,分配器的默认地址取值为:<code>localhost:8888</code> 。</p>
<pre class="sourceCode python"><code class="sourceCode python"><span class="kw">def</span> poll():
parser = argparse.ArgumentParser()
parser.add_argument(<span class="st">"--dispatcher-server"</span>,
<span class="dt">help</span>=<span class="st">"dispatcher host:port, "</span> \
<span class="co">"by default it uses localhost:8888"</span>,
default=<span class="st">"localhost:8888"</span>,
action=<span class="st">"store"</span>)
parser.add_argument(<span class="st">"repo"</span>, metavar=<span class="st">"REPO"</span>, <span class="dt">type</span>=<span class="dt">str</span>,
<span class="dt">help</span>=<span class="st">"path to the repository this will observe"</span>)
args = parser.parse_args()
dispatcher_host, dispatcher_port = args.dispatcher_server.split(<span class="st">":"</span>)</code></pre>
<p>当运行监听器脚本时,会直接从<code>poll()</code> 函数开始运行。这个函数会解析命令行的参数,并开始一个无限循环。这个while循环会定期的检查代码库的变化。这个循环中所做的第一个工作就是运行Bash脚本<code>update_repo.sh</code> <a class="footnoteRef" href="http://aosabook.org/en/500L/pages/a-continuous-integration-system.html#fn1" id="fnref1"><sup>1</sup></a> 。</p>
<pre class="sourceCode python"><code class="sourceCode python"> <code class="sourceCode python"><span class="kw">while</span> <span class="ot">True</span>:
<span class="kw">try</span>:
<span class="co"># call the bash script that will update the repo and check</span>
<span class="co"># for changes. If there's a change, it will drop a .commit_id file</span>
<span class="co"># with the latest commit in the current working directory</span>
subprocess.check_output([<span class="st">"./update_repo.sh"</span>, args.repo])
<span class="kw">except</span> subprocess.CalledProcessError <span class="ch">as</span> e:
<span class="kw">raise</span> <span class="ot">Exception</span>(<span class="st">"Could not update and check repository. "</span> +
<span class="co">"Reason: %s"</span> % e.output)</code></pre>
<p><code>update_repo.sh</code>用于识别新的提交并通知监听器。它首先记录当前所在的提交ID,然后拉取最新的代码,接着检查最新的提交ID。如果当前的版本ID与最新的匹配,说明代码没有变动,所以监听器不会作出任何响应。但是,如果提交ID间存在不同,就意味着有新了新的提交。这时,<code>update_repo.sh</code>将创建一个名为<code>.commit_id</code>的文件来记录最新的提交ID。</p>
<p><code>update_repo.sh</code>的细分步骤如下:首先,我们的脚本会执行<code>run_or_fail.sh</code>文件,该文件提供了一些基于shell脚本的辅助函数。通过这些函数我们可以运行指定的脚本并可以在运行出错时输出错误信息。</p>
<pre class="sourceCode bash"><code class="sourceCode bash"><span class="co">#!/bin/bash</span>
<span class="kw">source</span> run_or_fail.sh </code></pre>
<p>接下来,我们的脚本会试图删除<code>.commit_id</code>文件。
因为<code>repo_observer.py</code>会再无限循环中不断的调用<code>updaterepo.sh</code> ,如果在上一次的调用中产生了<code>.commit_id</code> 文件,并且其中储存的版本ID我们在上一次轮询中已经完成了测试,就会造成混乱。所以我们在每次都会先删除上一次的<code>.commit_id</code>文件,以免产生混乱。</p>
<pre class="sourceCode bash"><code class="sourceCode bash"><span class="kw">bash</span> rm -f .commit_id </code></pre>
<p>在删除了文件之后(在文件已经存在的情况下),脚本会检查我们监听的代码库是否存在,再把.commit_id更新到最近的一次提交,保证.commit_id文件与代码库提交ID之间的同步。</p>
<pre class="sourceCode bash"><code class="sourceCode bash"><span class="kw">run_or_fail</span> <span class="st">"Repository folder not found!"</span> pushd <span class="ot">$1</span> <span class="kw">1></span> /dev/null
<span class="kw">run_or_fail</span> <span class="st">"Could not reset git"</span> git reset --hard HEAD</code></pre>
<p>再之后,读取git的日志,将其中最后一次的提交ID解析出来。</p>
<pre class="sourceCode bash"><code class="sourceCode bash"><span class="ot">COMMIT=$(</span><span class="kw">run_or_fail</span> <span class="st">"Could not call 'git log' on repository"</span> git log -n1<span class="ot">)</span>
<span class="kw">if [</span> <span class="ot">$?</span> <span class="ot">!=</span> 0<span class="kw"> ]</span>; <span class="kw">then</span>
<span class="kw">echo</span> <span class="st">"Could not call 'git log' on repository"</span>
<span class="kw">exit</span> 1
<span class="kw">fi</span>
<span class="ot">COMMIT_ID=</span><span class="kw">`echo</span> <span class="ot">$COMMIT</span> <span class="kw">|</span> <span class="kw">awk</span> <span class="st">'{ print $2 }'</span><span class="kw">`</span></code></pre>
<p>接下来,拉取代码库,获取最近所有的更改,并得到最新的提交ID。</p>
<pre class="sourceCode bash"><code class="sourceCode bash"><span class="kw">run_or_fail</span> <span class="st">"Could not pull from repository"</span> git pull
<span class="ot">COMMIT=$(</span><span class="kw">run_or_fail</span> <span class="st">"Could not call 'git log' on repository"</span> git log -n1<span class="ot">)</span>
<span class="kw">if [</span> <span class="ot">$?</span> <span class="ot">!=</span> 0<span class="kw"> ]</span>; <span class="kw">then</span>
<span class="kw">echo</span> <span class="st">"Could not call 'git log' on repository"</span>
<span class="kw">exit</span> 1
<span class="kw">fi</span>
<span class="ot">NEW_COMMIT_ID=</span><span class="kw">`echo</span> <span class="ot">$COMMIT</span> <span class="kw">|</span> <span class="kw">awk</span> <span class="st">'{ print $2 }'</span><span class="kw">`</span></code></pre>
<p>最后,如果新得到的提交ID与上一次的ID不匹配,我们就知道在两次轮询间发生了新的提交,所以我们的脚本应该将新的提交ID储存在.commit_id文件中。</p>
<pre class="sourceCode bash"><code class="sourceCode bash"><span class="co"># if the id changed, then write it to a file</span>
<span class="kw">if [</span> <span class="ot">$NEW_COMMIT_ID</span> <span class="ot">!=</span> <span class="ot">$COMMIT_ID</span><span class="kw"> ]</span>; <span class="kw">then</span>
<span class="kw">popd</span> <span class="kw">1></span> /dev/null
<span class="kw">echo</span> <span class="ot">$NEW_COMMIT_ID</span> <span class="kw">></span> .commit_id
<span class="kw">fi</span></code></pre>
<p>当<code>repo_observer.py</code>运行<code>update_repo.sh</code>完之后,监听器会检查<code>.commit_id</code>是否存在。如果文件存在,我们就知道在上一次的轮询后又发生了新的提交,我们需要通知测试样例调度器来开始测试。监听器会通过连接并发送一个'status'请求来检查调度器服务的运行状态,以保证它处在可以正常接受指令的状态正常工作状态。</p>
<pre class="sourceCode python"><code class="sourceCode python"> <span class="kw">if</span> os.path.isfile(<span class="st">".commit_id"</span>):
<span class="kw">try</span>:
response = helpers.communicate(dispatcher_host,
<span class="dt">int</span>(dispatcher_port),
<span class="st">"status"</span>)
<span class="kw">except</span> socket.error <span class="ch">as</span> e:
<span class="kw">raise</span> <span class="ot">Exception</span>(<span class="st">"Could not communicate with dispatcher server: </span><span class="ot">%s</span><span class="st">"</span> % e)</code></pre>
<p>如果它以“OK”响应,则监听器会打开<code>.commit_id</code>文件,读取最新的提交ID并使用<code>dispatch:<commit ID></code>请求将该ID发送给调度程序,监听器会每个5秒发送一次指令。如果发生任何错误,监听器同样会每隔5s进行一次重试。</p>
<pre class="sourceCode python"><code class="sourceCode python"> <code class="sourceCode python"><span class="kw">if</span> response == <span class="st">"OK"</span>:
commit = <span class="st">""</span>
<span class="kw">with</span> <span class="dt">open</span>(<span class="st">".commit_id"</span>, <span class="st">"r"</span>) <span class="ch">as</span> f:
commit = f.readline()
response = helpers.communicate(dispatcher_host,
<span class="dt">int</span>(dispatcher_port),
<span class="st">"dispatch:</span><span class="ot">%s</span><span class="st">"</span> % commit)
<span class="kw">if</span> response != <span class="st">"OK"</span>:
<span class="kw">raise</span> <span class="ot">Exception</span>(<span class="st">"Could not dispatch the test: </span><span class="ot">%s</span><span class="st">"</span> %
response)
<span class="dt">print</span> <span class="st">"dispatched!"</span>
<span class="kw">else</span>:
<span class="kw">raise</span> <span class="ot">Exception</span>(<span class="st">"Could not dispatch the test: </span><span class="ot">%s</span><span class="st">"</span> %
response)
time.sleep(<span class="dv">5</span>)</code></pre>
<p>如果你不使用<code>KeyboardInterrupt</code> (Ctrl + c)终止监听器发送进程或发送终止信号,监听器会永远重复这一操作。</p>
<h3 id="the-dispatcher-dispatcher.py">测试样例分配器( <code>dispatcher.py</code> )</h3>
<p>测试样例分配器是一个用来为测试运行器分配测试任务的独立进程。他会在一个指定端口监听来自代码库监听器及测试运行器的请求。分配器允许测试运行器主动注册,当监听器发送一个提交ID时,它会将测试工作分配给一个已经注册的测试运行器。同时,它还可以平稳的处理测试运行器遇到的各种问题,当一个运行器发生故障,它可以立即将该运行器运行测试的提交ID重新分配给一个新的测试运行器。</p>
<p><code>dispatch.py</code> 脚本以<code>serve</code>函数为入口。首先,它会解析你设定的分配器的地址及端口:</p>
<pre class="sourceCode python"><code class="sourceCode python"><span class="kw">def</span> serve():
parser = argparse.ArgumentParser()
parser.add_argument(<span class="st">"--host"</span>,
<span class="dt">help</span>=<span class="st">"dispatcher's host, by default it uses localhost"</span>,
default=<span class="st">"localhost"</span>,
action=<span class="st">"store"</span>)
parser.add_argument(<span class="st">"--port"</span>,
<span class="dt">help</span>=<span class="st">"dispatcher's port, by default it uses 8888"</span>,
default=<span class="dv">8888</span>,
action=<span class="st">"store"</span>)
args = parser.parse_args()</code></pre>
<p>这里我们会开启分配器进程以及一个<code>runner_checker</code>函数进程,和一个<code>redistribute</code>函数进程。</p>
<pre class="sourceCode python"><code class="sourceCode python"> server = ThreadingTCPServer((args.host, <span class="dt">int</span>(args.port)), DispatcherHandler)
<span class="dt">print</span> `serving on %s:%s` % (args.host, <span class="dt">int</span>(args.port))
...
runner_heartbeat = threading.Thread(target=runner_checker, args=(server,))
redistributor = threading.Thread(target=redistribute, args=(server,))
<span class="kw">try</span>:
runner_heartbeat.start()
redistributor.start()
<span class="co"># Activate the server; this will keep running until you</span>
<span class="co"># interrupt the program with Ctrl+C or Cmd+C</span>
server.serve_forever()
<span class="kw">except</span> (<span class="ot">KeyboardInterrupt</span>, <span class="ot">Exception</span>):
<span class="co"># if any exception occurs, kill the thread</span>
server.dead = <span class="ot">True</span>
runner_heartbeat.join()
redistributor.join()</code></pre>
<p><code>runner_checker</code>函数会定期的ping每一个注册在位的运行器,来确保他们都处于正常工作的状态。。如果有运行器没有响应,该函数就会将其从注册的运行器池中删除,并且之前分配给他的提交ID会被重新分配给一个新的可用的运行器。函数会在<code>pending_commits</code>变量中记录运行受到运行器失去响应影响的提交ID</p>
<pre class="sourceCode python"><code class="sourceCode python"> <code class="sourceCode python"><span class="kw">def</span> runner_checker(server):
<span class="kw">def</span> manage_commit_lists(runner):
<span class="kw">for</span> commit, assigned_runner in server.dispatched_commits.iteritems():
<span class="kw">if</span> assigned_runner == runner:
<span class="kw">del</span> server.dispatched_commits[commit]
server.pending_commits.append(commit)
<span class="kw">break</span>
server.runners.remove(runner)
<span class="kw">while</span> not server.dead:
time.sleep(<span class="dv">1</span>)
<span class="kw">for</span> runner in server.runners:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
<span class="kw">try</span>:
response = helpers.communicate(runner[<span class="st">"host"</span>],
<span class="dt">int</span>(runner[<span class="st">"port"</span>]),
<span class="st">"ping"</span>)
<span class="kw">if</span> response != <span class="st">"pong"</span>:
<span class="dt">print</span> <span class="st">"removing runner </span><span class="ot">%s</span><span class="st">"</span> % runner
manage_commit_lists(runner)
<span class="kw">except</span> socket.error <span class="ch">as</span> e:
manage_commit_lists(runner)</code></pre>
<p><code>redistribute</code>函数用来将<code>pending_commits</code>中记录的提交ID进行分发。当<code>redistribute</code>运行时,它会不断的检查<code>pending_commits</code>中是否有提交ID。一旦发现<code>pending_commits</code>中存在提交ID,函数会调用<code>dispatch_tests</code>方法来将提交ID分配出去。</p>
<pre class="sourceCode python"><code class="sourceCode python"> <span class="kw">def</span> redistribute(server):
<span class="kw">while</span> not server.dead:
<span class="kw">for</span> commit in server.pending_commits:
<span class="dt">print</span> <span class="st">"running redistribute"</span>
<span class="dt">print</span> server.pending_commits
dispatch_tests(server, commit)
time.sleep(<span class="dv">5</span>)</code></pre>
<p><code>dispatch_tests</code>函数用来从已注册的运行器池中返回一个可用的运行器。如果得到了一个可用的运行器,函数会发送一个带有提交ID的运行测试指令。如果当前没有可用的运行器,函数会在2s的休眠之后重复上述过程。如果分配成功了,函数会在<code>dispatched_commits</code>变量中记录提交ID及该提交ID的测试正在由哪一个运行器运行。如果提交ID在<code>pending_commits</code>变量中,<code>dispatch_tests</code>函数会在重新分配后将提交ID从<code>pending_commits</code>中删除。</p>
<pre class="sourceCode python"><code class="sourceCode python"><span class="kw">def</span> dispatch_tests(server, commit_id):
<span class="co"># NOTE: usually we don't run this forever</span>
<span class="kw">while</span> <span class="ot">True</span>:
<span class="dt">print</span> <span class="st">"trying to dispatch to runners"</span>
<span class="kw">for</span> runner in server.runners:
response = helpers.communicate(runner[<span class="st">"host"</span>],
<span class="dt">int</span>(runner[<span class="st">"port"</span>]),
<span class="st">"runtest:</span><span class="ot">%s</span><span class="st">"</span> % commit_id)
<span class="kw">if</span> response == <span class="st">"OK"</span>:
<span class="dt">print</span> <span class="st">"adding id </span><span class="ot">%s</span><span class="st">"</span> % commit_id
server.dispatched_commits[commit_id] = runner
<span class="kw">if</span> commit_id in server.pending_commits:
server.pending_commits.remove(commit_id)
<span class="kw">return</span>
time.sleep(<span class="dv">2</span>)</code></pre>
<p>分配器服务用到了标准库中的一个叫<code>SocketServer</code>模块,它是一个非常简单的服务器。<code>SocketServer</code>模块中有四种基本的服务器类型: <code>TCP</code> , <code>UDP</code> , <code>UnixStreamServer</code>和<code>UnixDatagramServer</code> 。为了保证我们的数据传输连续稳定,我们使用基于TCP协议的套接字(UPD并不能保证数据的稳定和连续)。</p>
<p><code>SocketServer</code>中提供的默认的<code>TCPServer</code>同时只能处理一个请求,因此当分配器与一个运行器建立会话后,就无法再同时与监听器建立连接了。此时来自监听器的会话只能等待第一个会话完成并断开连接才能建立与分配器的连接。这对于我们的项目而言并不是非常理想,在我们预想中,分配器应该直接而迅速的同时与所有运行器及监听器进行通信。</p>
<p>为了使我们的分配器可以同时维护多个连接,我们使用了一个自定义的类<code>ThreadingTCPServer</code>来为默认的<code>SocketServer</code> 类增加多线程运行的功能。也就是说无论何时分配器接收到连接请求,他都会新建一个进程来处理这个会话。这就使分配器同时维护多个连接成为了可能。</p>
<pre class="sourceCode python"><code class="sourceCode python"><span class="kw">class</span> ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
runners = [] <span class="co"># Keeps track of test runner pool</span>
dead = <span class="ot">False</span> <span class="co"># Indicate to other threads that we are no longer running</span>
dispatched_commits = {} <span class="co"># Keeps track of commits we dispatched</span>
pending_commits = [] <span class="co"># Keeps track of commits we have yet to dispatch</span></code></pre>
<p>分配器会为每一个请求定义了处理函数。我们通过<code>DispatcherHandler</code>类定义来实现这种功能。<code>DispatcherHandler</code>继承自<code>SocketServer</code>的<code>BaseRequestHandler</code> 基类。这个基类只需要我们定义句柄函数,只要请求连接它就会被调用。我们将这个函数的自定义内容写在<code>DispatcherHandler</code>中,并且确保在每一次请求出现是,这个函数能够被调用。这个函数会不断地监听发来的请求( <code>self.request</code>中保存着请求信息),并解析请求中的指令。</p>
<pre class="sourceCode python"><code class="sourceCode python"><span class="kw">class</span> DispatcherHandler(SocketServer.BaseRequestHandler):
<span class="co">"""</span>
<span class="co"> The RequestHandler class for our dispatcher.</span>
<span class="co"> This will dispatch test runners against the incoming commit</span>
<span class="co"> and handle their requests and test results</span>
<span class="co"> """</span>
command_re = re.<span class="dt">compile</span>(<span class="st">r"(\w+)(:.+)*"</span>)
BUF_SIZE = <span class="dv">1024</span>
<span class="kw">def</span> handle(<span class="ot">self</span>):
<span class="ot">self</span>.data = <span class="ot">self</span>.request.recv(<span class="ot">self</span>.BUF_SIZE).strip()
command_groups = <span class="ot">self</span>.command_re.match(<span class="ot">self</span>.data)
<span class="kw">if</span> not command_groups:
<span class="ot">self</span>.request.sendall(<span class="st">"Invalid command"</span>)
<span class="kw">return</span>
command = command_groups.group(<span class="dv">1</span>)</code></pre>
<p>这个函数可以处理如下指令: <code>status</code> , <code>register</code> , <code>dispatch</code>和<code>results</code> 。其中 <code>status</code>函数用来检测分配器服务是否处于运行状态。</p>
<pre class="sourceCode python"><code class="sourceCode python"> <span class="kw">if</span> command == <span class="st">"status"</span>:
<span class="dt">print</span> <span class="st">"in status"</span>
<span class="ot">self</span>.request.sendall(<span class="st">"OK"</span>)</code></pre>
<p>为了让分配器的功能生效,我们需要注册至少一个运行器。当注册器被调用时,为了确保在需要发送提交ID触发测试时能准确的找到对应的运行器,会在一个列表中保存下运行器的“地址:端口”数据(运行器的数据会被保存在一个叫<code>ThreadingTCPServer</code>的对象中)。</p>
<pre class="sourceCode python"><code class="sourceCode python"> <span class="kw">elif</span> command == <span class="st">"register"</span>:
<span class="co"># Add this test runner to our pool</span>
<span class="dt">print</span> <span class="st">"register"</span>
address = command_groups.group(<span class="dv">2</span>)
host, port = re.findall(<span class="st">r":(\w*)"</span>, address)
runner = {<span class="st">"host"</span>: host, <span class="st">"port"</span>:port}
<span class="ot">self</span>.server.runners.append(runner)
<span class="ot">self</span>.request.sendall(<span class="st">"OK"</span>)</code></pre>
<p><code>dispatch</code>指令用于代码库监听器下发测试的提交ID给测试运行器。此命令的用法为<code>dispatch:<commit ID></code> 。分配器从此消息中解析出提交ID并将其发送给测试运行器。</p>
<pre class="sourceCode python"><code class="sourceCode python"> <span class="kw">elif</span> command == <span class="st">"dispatch"</span>:
<span class="dt">print</span> <span class="st">"going to dispatch"</span>
commit_id = command_groups.group(<span class="dv">2</span>)[<span class="dv">1</span>:]
<span class="kw">if</span> not <span class="ot">self</span>.server.runners:
<span class="ot">self</span>.request.sendall(<span class="st">"No runners are registered"</span>)
<span class="kw">else</span>:
<span class="co"># The coordinator can trust us to dispatch the test</span>
<span class="ot">self</span>.request.sendall(<span class="st">"OK"</span>)
dispatch_tests(<span class="ot">self</span>.server, commit_id)</code></pre>
<p><code>results</code>指令会由测试运行器在上报测试结果是调用。此命令的用法为<code>results:<commit ID>:<length of results data in bytes>:<results></code> 。<code><commit ID></code>用于标识测试报告对应的提交ID。<code><length of results data in bytes></code>用于计算结果数据使用需要多大的缓冲区。最后, <code><results></code>中是实际报告信息。</p>
<pre class="sourceCode python"><code class="sourceCode python"> <code class="sourceCode python"><span class="kw">elif</span> command == <span class="st">"results"</span>:
<span class="dt">print</span> <span class="st">"got test results"</span>
results = command_groups.group(<span class="dv">2</span>)[<span class="dv">1</span>:]
results = results.split(<span class="st">":"</span>)
commit_id = results[<span class="dv">0</span>]
length_msg = <span class="dt">int</span>(results[<span class="dv">1</span>])
<span class="co"># 3 is the number of ":" in the sent command</span>
remaining_buffer = <span class="ot">self</span>.BUF_SIZE - \
(<span class="dt">len</span>(command) + <span class="dt">len</span>(commit_id) + <span class="dt">len</span>(results[<span class="dv">1</span>]) + <span class="dv">3</span>)
<span class="kw">if</span> length_msg > remaining_buffer:
<span class="ot">self</span>.data += <span class="ot">self</span>.request.recv(length_msg - remaining_buffer).strip()
<span class="kw">del</span> <span class="ot">self</span>.server.dispatched_commits[commit_id]
<span class="kw">if</span> not os.path.exists(<span class="st">"test_results"</span>):
os.makedirs(<span class="st">"test_results"</span>)
<span class="kw">with</span> <span class="dt">open</span>(<span class="st">"test_results/</span><span class="ot">%s</span><span class="st">"</span> % commit_id, <span class="st">"w"</span>) <span class="ch">as</span> f:
data = <span class="ot">self</span>.data.split(<span class="st">":"</span>)[<span class="dv">3</span>:]
data = <span class="st">"</span><span class="ch">\n</span><span class="st">"</span>.join(data)
f.write(data)
<span class="ot">self</span>.request.sendall(<span class="st">"OK"</span>)</code></pre>
<h3 id="the-test-runner-test_runner.py">测试运行器( <code>test_runner.py</code> )</h3>
<p>测试运行器负责对给定的提交ID运行测试,并上报测试结果。它仅会与分配器通信,分配器负责为其提供需要运行测试的提交ID,并且会接收测试结果报告。</p>
<p><code>test_runner.py</code>文件会以启动测试运行器服务的<code>serve</code>函数为入口,并启动一个线程来运行<code>dispatcher_checker</code>函数。由于此启动过程与<code>repo_observer.py</code>和<code>dispatcher.py</code>的启动过程非常相似,因此我们在这里就不再赘述。</p>
<p><code>dispatcher_checker</code>函数每五秒对分配器执行一次ping操作,以确保它仍然在正常运行。这个操作主要是出于资源管理上的考虑。如果对应的分配器挂了,就需要将测试运行器也关闭。 否则测试运行器就只能空跑,及接收不到新的任务也无法提交之前任务产生报告。</p>
<pre class="sourceCode python"><code class="sourceCode python"> <code class="sourceCode python"><span class="kw">def</span> dispatcher_checker(server):
<span class="kw">while</span> not server.dead:
time.sleep(<span class="dv">5</span>)
<span class="kw">if</span> (time.time() - server.last_communication) > <span class="dv">10</span>:
<span class="kw">try</span>:
response = helpers.communicate(
server.dispatcher_server[<span class="st">"host"</span>],
<span class="dt">int</span>(server.dispatcher_server[<span class="st">"port"</span>]),
<span class="st">"status"</span>)
<span class="kw">if</span> response != <span class="st">"OK"</span>:
<span class="dt">print</span> <span class="st">"Dispatcher is no longer functional"</span>
server.shutdown()
<span class="kw">return</span>
<span class="kw">except</span> socket.error <span class="ch">as</span> e:
<span class="dt">print</span> <span class="st">"Can't communicate with dispatcher: </span><span class="ot">%s</span><span class="st">"</span> % e
server.shutdown()
<span class="kw">return</span></code></pre>
<p>测试运行器的服务于分配器相同都是<code>ThreadingTCPServer</code> ,它需要多线程运行的是因为分配器既会向它下发提交ID,也可能在测试运行的期间的ping它。</p>
<pre class="sourceCode python"><code class="sourceCode python"><span class="kw">class</span> ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
dispatcher_server = <span class="ot">None</span> <span class="co"># Holds the dispatcher server host/port information</span>
last_communication = <span class="ot">None</span> <span class="co"># Keeps track of last communication from dispatcher</span>
busy = <span class="ot">False</span> <span class="co"># Status flag</span>
dead = <span class="ot">False</span> <span class="co"># Status flag</span></code></pre>
<p>整个通信流是从分配器向测试运行器发送需要运行测试的提交ID开始的。如果测试运行器的状态可以运行测试,它会发送确认消息回分配器,然后关闭第一个连接。为了使测试运行器在跑测试的同时还能接受来自分配器的其他请求,它会单独启动一个进程来运行测试。</p>
<p>这样,当分配器在测试运行器正在运行测试的时候发来一个请求(比如一个ping请求), 测试运行器的测试跑在另一个进程上,运行器服务本身仍然可以在作出响应。这样测试运行器就可支持同时运行多个任务了。还有一种替代多线程运行的设计是在分配器与测试运行器间建立一个长连接。 但这样会在分配器端消耗大量的内存来维持连接,另外这种方式还对网络有强依赖。 如果网络一旦产生波动(比如突然的断线)就会对系统造成破坏。</p>
<p>测试运行器会从分配器接收到两种消息。 第一种是<code>ping</code>消息 ,分配器用这个消息来验证测试运行器是否仍处于活跃状态。</p>
<pre class="sourceCode python"><code class="sourceCode python"><span class="kw">class</span> TestHandler(SocketServer.BaseRequestHandler):
...
<span class="kw">def</span> handle(<span class="ot">self</span>):
....
<span class="kw">if</span> command == <span class="st">"ping"</span>:
<span class="dt">print</span> <span class="st">"pinged"</span>
<span class="ot">self</span>.server.last_communication = time.time()
<span class="ot">self</span>.request.sendall(<span class="st">"pong"</span>)</code></pre>
<p>另一个是 <code>runtest</code> ,它的格式是<code>runtest:<commit ID></code> 。这条指令用于分配器下发需要测试的提交ID。当接收到runtest时,测试运行器将检查当前是否有正在运行的测试。如果有,它会给分配器返回<code>BUSY</code>的响应。如果没有,它会返回<code>OK</code>,将其状态设置为busy并运行其<code>run_tests</code>函数。</p>
<pre class="sourceCode python"><code class="sourceCode python"> <code class="sourceCode python"><span class="kw">elif</span> command == <span class="st">"runtest"</span>:
<span class="dt">print</span> <span class="st">"got runtest command: am I busy? </span><span class="ot">%s</span><span class="st">"</span> % <span class="ot">self</span>.server.busy
<span class="kw">if</span> <span class="ot">self</span>.server.busy:
<span class="ot">self</span>.request.sendall(<span class="st">"BUSY"</span>)
<span class="kw">else</span>:
<span class="ot">self</span>.request.sendall(<span class="st">"OK"</span>)
<span class="dt">print</span> <span class="st">"running"</span>
commit_id = command_groups.group(<span class="dv">2</span>)[<span class="dv">1</span>:]
<span class="ot">self</span>.server.busy = <span class="ot">True</span>
<span class="ot">self</span>.run_tests(commit_id,
<span class="ot">self</span>.server.repo_folder)
<span class="ot">self</span>.server.busy = <span class="ot">False</span></code></pre>
<p>这个函数会调用一个叫<code>test_runner_script.sh</code> 的shell脚本,该脚本会将代码更新为给定的提交ID。脚本返回后,如果代码库已经被成功的更新,运行器会使用unittest运行测试并将结果收集到一个文件中。测试运行完毕后,测试运行器将读入结果报告文件,并将报告发送给调度程序。</p>
<pre class="sourceCode python"><code class="sourceCode python"> <span class="kw">def</span> run_tests(<span class="ot">self</span>, commit_id, repo_folder):
<span class="co"># update repo</span>
output = subprocess.check_output([<span class="st">"./test_runner_script.sh"</span>,
repo_folder, commit_id])
<span class="dt">print</span> output
<span class="co"># run the tests</span>
test_folder = os.path.join(repo_folder, <span class="st">"tests"</span>)
suite = unittest.TestLoader().discover(test_folder)
result_file = <span class="dt">open</span>(<span class="st">"results"</span>, <span class="st">"w"</span>)
unittest.TextTestRunner(result_file).run(suite)
result_file.close()
result_file = <span class="dt">open</span>(<span class="st">"results"</span>, <span class="st">"r"</span>)
<span class="co"># give the dispatcher the results</span>
output = result_file.read()
helpers.communicate(<span class="ot">self</span>.server.dispatcher_server[<span class="st">"host"</span>],
<span class="dt">int</span>(<span class="ot">self</span>.server.dispatcher_server[<span class="st">"port"</span>]),
<span class="st">"results:</span><span class="ot">%s</span><span class="st">:</span><span class="ot">%s</span><span class="st">:</span><span class="ot">%s</span><span class="st">"</span> % (commit_id, <span class="dt">len</span>(output), output))</code></pre>
<p>这是<code>test_runner_script.sh</code>的内容 :</p>
<pre class="sourceCode bash"><code class="sourceCode bash"><span class="co">#!/bin/bash</span>
<span class="ot">REPO=$1</span>
<span class="ot">COMMIT=$2</span>
<span class="kw">source</span> run_or_fail.sh
<span class="kw">run_or_fail</span> <span class="st">"Repository folder not found"</span> pushd <span class="st">"</span><span class="ot">$REPO</span><span class="st">"</span> <span class="kw">1></span> /dev/null
<span class="kw">run_or_fail</span> <span class="st">"Could not clean repository"</span> git clean -d -f -x
<span class="kw">run_or_fail</span> <span class="st">"Could not call git pull"</span> git pull
<span class="kw">run_or_fail</span> <span class="st">"Could not update to given commit hash"</span> git reset --hard <span class="st">"</span><span class="ot">$COMMIT</span><span class="st">"</span></code></pre>
<p>要运行<code>test_runner.py</code> ,必须将其指向存储库的副本。你可以使用我们先前创建的<code>/path/to/test_repo test_repo_clone_runner</code> 副本作为启动参数。默认情况下, <code>test_runner.py</code>将在localhost的8900-9000端口上启动,并尝试连接到<code>localhost:8888</code>上的调度程序服务器。你通过可以一些可选参数来更改这些值。<code>--host</code>和<code>--port</code>参数用于指定运行测试运行器服务器地址和端口, <code>--dispatcher-server</code>参数指定调度程序的地址。</p>
<h3 id="control-flow-diagram">控制流程图</h3>
<p><a href="http://aosabook.org/en/500L/pages/a-continuous-integration-system.html#figure-2.1">图2.1</a>是该系统的概述图。图中假设所有三个文件( <code>repo_observer.py</code> , <code>dispatcher.py</code>和<code>test_runner.py</code> )都已在运行,并描述了每个进程在新的提交发生时所采取的操作。</p>
<div class="center figure">
<a name="figure-2.1"></a><img src="./image/diagram.png" alt="图2.1 - 控制流程" title="图2.1 - 控制流程">
</div>
<p class="center figcaption">
<small>图2.1 - 控制流程</small>
</p>
<h3 id="running-the-code">运行代码</h3>
<p>我们可以在本地运行这个简单的CI系统,为每个进程使用不同的终端shell。我们首先启动分配器,它默认运行在端口8888上:</p>
<pre class="sourceCode bash"><code class="sourceCode bash">$ <span class="kw">python</span> dispatcher.py</code></pre>
<p>开一个新的的shell,我们启动测试运行器(这样它就可以在分配器中注册了):</p>
<pre class="sourceCode bash"><code class="sourceCode bash">$ <span class="kw">python</span> test_runner.py <span class="kw"><</span>path/to/test_repo_clone_runner<span class="kw">></span></code></pre>
<p>测试运行器将自动为自己分配端口,范围为8900-9000。你可以根据需求尽可能多起几个测试运行器。</p>
<p>最后,在另一个新shell中,让我们启动代码库监听器:</p>
<pre class="sourceCode bash"><code class="sourceCode bash">$ <span class="kw">python</span> repo_observer.py --dispatcher-server=localhost:8888 <span class="kw"><</span>path/to/repo_clone_obs<span class="kw">></span></code></pre>
<p>现在万事俱备,让我们触发一些测试吧玩一下吧!根据设计我们需要创建一个新的提交来触发测试。切换到你的主代码仓库中, 随便改点什么:</p>
<pre class="sourceCode bash"><code class="sourceCode bash">$ <span class="kw">cd</span> /path/to/test_repo
$ <span class="kw">touch</span> new_file
$ <span class="kw">git</span> add new_file
$ <span class="kw">git</span> commit -m<span class="st">"new file"</span> new_file</code></pre>
<p>然后<code>repo_observer.py</code>识别到有一个新的提交产生了,之后通知分配器。你可以在它们各自的shell窗口中查看它们的运行日志。当分配器收到测试结果,它就会将它们存储在此代码库中的<code>test_results/</code>文件夹中,并使用提交ID作为文件名。</p>
<h2 id="error-handling">错误处理</h2>
<p>该CI系统中包括一些简单的错误处理。</p>
<p>如果你讲<code>test_runner.py</code>进程杀掉, <code>dispatcher.py</code>将确定该运行器将会识别出这个节点已经不再活跃,并将其从运行器池中移除。</p>
<p>你也可以模拟网络或系统故障,在测试运行器执行测试的时候将它杀死。这时,分配器会识别到运行器已经挂了,它会将挂掉的运行器从运行器池中移除,并将这个运行器之前在执行的任务分配给池中其他的运行器。</p>
<p>如果你杀掉分配器,那么监听器会直接报错。测试运行器也会发现分配器不再运行,并自动关闭。</p>
<h2 id="conclusion">结论</h2>
<p>通过逐个分析各个进程中的不同功能,我们对构建了一个分布式的持续集成系统的有了一些基本的认识。通过套接字请求实现进程间的通信,我们的CI系统可以分布式的运行在不同的机器上,这增强了我们的系统可靠性和可扩展性。</p>
<p>这套CI系统现在的功能仍然非常简单,你还可以发挥自己的才能对它进行各种扩展以实现更多功能。以下是一些改进建议:</p>
<h3 id="per-commit-test-runs">对每次提交自动运行测试</h3>
<p>当前系统将定期检查是否有新的提交并对最近的一次提交运行测试。这个设计可以改为每次提交都触发测试。你可以修改定期检查程序,获取在两次轮询中发生的所有提交来实现这个功能。</p>
<h3 id="smarter-test-runners">更智能的运行器</h3>
<p>如果测试运行器检测到分配器没有响应,则它将停止运行。当测试运行器正在运行测试时,也会立即关闭!如果测试运行器可以有一段时的等待期或者长期运行(如果你并不在乎它对资源的占用)来等待分配器恢复可能会更好一些。这样当分配器恢复时,运行器既可以将之前执行的测试的报告重新发回分配器。这样可以避免因分配器故障而引起的重复任务,在对每一个提交都执行测试时,这将很大程度上节约运行器资源。</p>
<h3 id="real-reporting">报告展示</h3>
<p>在真正的CI系统中,测试报告一般会发送到一个单独的报告服务。在报告系统中,人们可以查看报告详情,或是设置一些通知规则,在遇到故障或其他一些特殊的情况下通知相关人员。你可以为我的CI系统创建一个独立的报告进程,替换掉分配器的报告收集功能。这个新的进程可以是一个Web服务(或链接到一个Web服务上), 这样我们就可以在网页上直接在线查看测试报告,甚至可以用一个邮件服务器来实现测试失败时的提醒。</p>
<h3 id="test-runner-manager">测试运行器管理器</h3>
<p>在当前的系统中,我们必须手动运行<code>test_runner.py</code>文件来启动测试运行器。你可以创建一个测试运行器管理器进程,通过这个进程来管理查看所有运行器上的负载和来自分配器的请求,对运行器的数量进行相应的调整。这个进程会接受所有的测试任务,根据任务启动测试运行器,并在任务少的时候减少运行器的实例。</p>
<p>遵循这些建议,你可以使这个简单的CI系统更加健壮并且容错率更高,并且具有与其他系统(比如一个网页版的报告查看器)集成的能力。</p>
<p>如果你希望了解现在的持续集成系统可以实现到什么样的灵活性,我建议你去看看<a href="http://jenkins-ci.org/">Jenkins</a> ,这是一个用Java编写的非常强大的开源CI系统。它提供了一个基本的CI系统,同时也允许使用插件进行扩展。你可以<a href="https://github.com/jenkinsci/jenkins/">通过GitHub</a>访问其源代码。另一个推荐的项目是<a href="https://travis-ci.org/">Travis CI</a> ,它是用Ruby编写的,其源代码也可以<a href="https://github.com/travis-ci/travis-ci">通过GitHub</a> 得到。</p>
<p>这是了解CI系统如何工作以及如何自己构建CI系统的尝试。现在你应该对制作一个可靠的分布式系统所需的内容有了更深入的了解,希望你可以利用这些知识开发更复杂的解决方案。</p>
<div class="footnotes">
<hr>
<ol>
<li id="fn1"><p>使用Bash是因为我们需要 检查文件存在,创建文件和使用Git,而shell脚本是实现这一目标的最直接,最简单的方法。当然,你也可以使用一些跨平台的Python包;例如,用的Python内置的<code>os</code>模块访问文件系统,用GitPython访问Git,但是用这些包实现起来稍微有一点不直观。<a href="http://aosabook.org/en/500L/pages/a-continuous-integration-system.html#fnref1">↩</a></p></li>
</ol>
</div>
</div>
</div>
</div>
</body>
</html>