Go基本程序结构

编写测试程序

测试程序:

  • 源码文件以_test结尾:x x x_test.go

  • 测试方法名以Test开头:func TestXXX(t *testing.T) {...}

1
2
3
4
5
6
7
package test

import "testing"

func TestFirstTry(t *testing.T) {
t.Log("My first try!")
}

实现Fibonacci数列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package test

import (
"testing"
)

func TestFibonacciList(t *testing.T) {
//var a int = 1 // 定义变量
//var b int = 1
var (
a int = 1
b int = 1
) // 这样也可以定义变量
// a := 1 直接赋值
// b := 1
t.Log(a)
for i := 0; i < 5; i++ {
t.Log(" ", b)
tmp := a
a = b
b = tmp + a
}
}

变量及常量

变量

  • 赋值可以进行自动类型推断
  • 在一个赋值语句中可以对多个变量进行同时赋值
1
2
3
4
5
6
7
8
9
func TestExchange(t *testing.T) {
a := 1
b := 2
//tmp := a
//a = b
//b = tmp
a, b = b, a // 变量交换
t.Log(a, b)
}

常量 进行快速 设置连续值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
const (
Monday = iota + 1
Tuesday // 2
Wednesday // 3
Thursday // 4
Friday // 5
Saturday // 6
Sunday // 7
)

const (
Readable = 1 << iota
Writable
Executable
)

func TestConstantTry(t *testing.T) {
t.Log(Monday, Tuesday, Wednesday, Thursday, Friday, Saturday, Sunday) // 1 2 3 4 5 6 7
}

func TestConstantTry1(t *testing.T) {
a := 7 // 1111
t.Log(a&Readable == Readable, a&Writable == Writable, a&Executable == Executable) // true true true
}

数据类型

所有类型:

  • bool

  • string

  • int int8 int16 int32 int64

  • uint uint8 uint16 uint32 uint64 uintptr

  • byte // alias for uint8

  • rune // alias for int32,represents a Unicode code point

  • float32 float64

  • Complex64 complex128

与其它主要编程语言的差异:

  • Go语言不允许隐式类型转换

  • 别名和原有类型也不能进行隐式类型转换

类型的预定义值:

  • math.MaxInt64
  • math.MaxFloat64
  • math.MaxUint32

指针类型:

  • 不支持指针运算
  • string是值类型,其默认的初始化值为空字符串,而不是nil

运算符

算术运算符

运算符 描述
+ 相加
- 相减
* 相乘
/ 相除
% 求余
++ 自增
自减

Go语言没有前置的 ++和–

比较运算符

运算符 描述
== 检查两个值是否相等
!= 检查两个值是否不相等
> 检查左边值是否大于右边值
< 检查左边值是否小于右边值
>= 检查左边值是否大于等于右边值
<= 检查左边值是否小于等于右边值

用 == 比较数组:

  • 相同维数且含有相同个数元素的数组才可以比较
  • 每个元素都相同的才想等
1
2
3
4
5
6
7
func TestCompareArray(t *testing.T) {
a := [...]int{1, 2, 3, 4}
b := [...]int{1, 2, 3, 5}
c := [...]int{1, 2, 3, 4}
t.Log(a == b) // false
t.Log(a == c) // true
}

逻辑运算符

运算符 描述
&& 逻辑AND运算符
|| 逻辑OR运算符
! 逻辑NOT运算符

位运算符

运算符 描述
& 参与运算两数各对应的二进位相与
| 参与运算两数各对应的二进位相或
^ 参与运算两数各对应的二进位相异或
<< 左移运算符
>> 右移运算符

&^按位置零

1 &^ 0 – 1

1 &^ 1 – 0

0 &^ 1 – 0

0 &^ 0 – 0

条件和循环

循环

Go语言仅支持循环关键字 for

循环:for (j := 7; j<=9; i++)

条件循环 while(n<5)

1
2
3
4
5
n := 0
for n < 5 {
t.Log(n)
n++
}

无限循环 while(true)

1
2
3
4
5
n := 0
for {
t.Log(n)
n++
}

条件

if条件:

  • condition 表达式结果必须为布尔值
  • 支持变量赋值:
1
2
3
4
5
func TestIfMultiSec(t *testing.T) {
if a := 1 == 1; a {
t.Log(a)
}
}

switch条件:

  • 条件表达式不限制为常量或者整数;
  • 单个case中,可以出现多个结果选项,用逗号分隔;
  • 与C语言等规则相反,Go语言不需要用break来明确退出一个case;
  • 可以不设定switch之后的条件表达式,在此种情况下,整个switch结构与多个if…else..的逻辑作用等同;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func TestSwitchMultiCase(t *testing.T) {
for i := 0; i < 5; i++ {
switch i {
case 0, 2:
t.Log("even")
case 1, 3:
t.Log("odd")
default:
t.Log("it is not 0-3")
}
}
}

func TestSwitchCaseCondition(t *testing.T) {
for i := 0; i < 5; i++ {
switch {
case i%2 == 0:
t.Log("even")
case i%2 == 1:
t.Log("odd")
default:
t.Log("it is not 0-3")
}
}
}

Hello,Go

简简单单了解Go

Go的特点

  • 只有25个关键字
  • 强类型语言
  • 垃圾回收
  • 指针直接访问内存

开发环境构建

  • 1.8之前必须设置
  • 1.8之后没用设置,将使用默认值
  • 扩展名必须为go

编写第一个go程序

hello.go

1
2
3
4
5
6
7
8
9
10
package main // 包,表明代码所在的模块

import (
"fmt"
) // 引入代码依赖

// 功能实现
func main() {
fmt.Println("Hello, World!")
}

两种运行方式:

  • go run
1
2
$ go run hello.go
Hello, World!
  • go build (生成一个可执行文件)
1
2
3
4
5
$ go build hello.go
$ ls
hello hello.go
$ ./hello
Hello, World!

应用程序入口:

  • 必须是mainpackage main
  • 必须是main 方法 func main()
  • 文件名不一定是main.go

退出返回值:

  • Go中main函数不支持任何返回值
  • 通过os.Exit来返回状态

获取命令行参数:

  • Go中main函数不支持任何返回值
  • main函数不支持传入参数 func main(arg []string)
  • 在程序中直接通过os.Args获取命令行参数
1
2
3
4
5
6
7
8
9
10
11
12
13
package main // 包,表明代码所在的模块

import (
"fmt"
"os"
) // 引入代码依赖

// 功能实现
func main() {
fmt.Println("Hello, World!")
fmt.Println(os.Args[0], os.Args[1]) // 默认情况下 参数0返回可执行文件路径
os.Exit(100)
}
1
2
3
4
5
$ go run hello.go gaobinzhan
Hello, World!
参数0:/var/folders/qr/9vkwk7xn5rzbtnmykx7sxyv00000gn/T/go-build024626496/b001/exe/hello
参数1:gaobinzhan
exit status 100

swoole—csp编程模型

协程

不需要操作系统参与,创建销毁和切换的成本非常低,遇到io会自动让出cpu执行权,交给其它协程去执行。

协程执行流程

Swoole协程

非协程代码:

1
2
3
4
5
6
7
<?php
$start = time();
for ($i = 0; $i < 500; $i++) {
file_get_contents('http://www.easyswoole.com/');
echo '任务' . $i . '完成' . PHP_EOL;
}
echo '非协程总耗时' . (time() - $start) . 's' . PHP_EOL;

执行结果:

1
2
3
4
5
6
任务495完成
任务496完成
任务497完成
任务498完成
任务499完成
非协程总耗时13s

协程代码:

1
2
3
4
5
6
7
8
9
<?php
$start = time();
for ($i = 0; $i < 500; $i++) {
Swoole\Coroutine::create(function () use ($i, $start) {
$client = new Swoole\Coroutine\Http\Client('www.easyswoole.com', 80);
$client->get('/');
echo '任务' . $i . '完成' . '耗时' . (time() - $start) . 's' . PHP_EOL;
});
}

执行结果:

1
2
3
4
5
6
7
任务389完成耗时1s
任务395完成耗时1s
任务434完成耗时1s
任务477完成耗时1s
任务469完成耗时1s
任务385完成耗时1s
任务498完成耗时1s

可以发现速度相当快,但任务的id,不是顺序执行的,这就是遇到了ioswoole底层自动切换让出cpu执行权。

Channle

用于协程间通讯,支持多生产者协程和多消费者协程。底层自动实现了协程的切换和调度。

执行下面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$start = time();
function task1()
{
Swoole\Coroutine::sleep(3); // 模拟io阻塞
echo "发短信\n";
}

function task2()
{
Swoole\Coroutine::sleep(3); // 模拟io阻塞
echo "发邮件\n";
}

Swoole\Coroutine::create('task1');
Swoole\Coroutine::create('task2');
echo '总耗时' . (time() - $start) . 's' . PHP_EOL;

执行结果:

1
2
3
总耗时0s
发短信
发邮件

却发现以上代码,先执行的echo '总耗时' . (time() - $start) . 's' . PHP_EOL;

要等待task1task2执行成功后输出,该怎么半呢,这就利用了channel,来实现csp并发编程。

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<?php
Swoole\Coroutine::create(function (){
$start = time();
$channel = new Swoole\Coroutine\Channel();
function task1($channel)
{
/** @var Swoole\Coroutine\Channel $channel */
Swoole\Coroutine::sleep(3); // 模拟io阻塞
$channel->push("发短信\n");
}

function task2($channel)
{
/** @var Swoole\Coroutine\Channel $channel */
Swoole\Coroutine::sleep(3); // 模拟io阻塞
$channel->push("发邮件\n");
}

Swoole\Coroutine::create('task1', $channel);
Swoole\Coroutine::create('task2', $channel);

for ($i = 0; $i < 2; $i++) {
echo $channel->pop();
}

echo '总耗时' . (time() - $start) . 's' . PHP_EOL;
});

执行结果:

1
2
3
发短信
发邮件
总耗时3s

可以看到耗时3s,但我们在增加一个任务,for里面的$i就要修改,使得我们的代码非常繁琐,所以就有了WaitGroup

channel可以实现协程通信,依赖管理,协程同步。

实现连接池功能可以看我之前的文章,传送门

WaitGroup

基于Channel实现的Golangsync.WaitGrup功能。

方法:

  • add 方法增加计数
  • done 表示任务已完成
  • wait 等待所有任务完成恢复当前协程的执行
  • WaitGroup 对象可以复用,adddonewait 之后可以再次使用

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<?php
Swoole\Coroutine::create(function () {

$start = time();
$waitGroup = new Swoole\Coroutine\WaitGroup();
function task1($waitGroup)
{
/** @var WaitGroup $waitGroup */
Swoole\Coroutine::sleep(3); // 模拟io阻塞
echo "发短信\n";
$waitGroup->done();;
}

function task2($waitGroup)
{
/** @var WaitGroup $waitGroup */
Swoole\Coroutine::sleep(3); // 模拟io阻塞
echo "发邮件\n";
$waitGroup->done();

}

$waitGroup->add();
Swoole\Coroutine::create('task1', $waitGroup);

$waitGroup->add();
Swoole\Coroutine::create('task2', $waitGroup);

$waitGroup->wait();

echo '总耗时' . (time() - $start) . 's' . PHP_EOL;
});

执行结果跟之前一样,也是好时3s,但是不是更简单了呢。

Context

协程原有的异步逻辑同步化,但是在协程切换是隐式发生的,所有协程切换的前后不能保证全局遍历及static变量的一致性。

context用协程id做隔离,来保存上下文内容。

代码复现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<?php
class Email
{
static $email = null;
}

Swoole\Coroutine::create(function () {

function task1($email)
{
Email::$email = $email;
Swoole\Coroutine::sleep(3); // 模拟io阻塞
echo "发邮件:" . Email::$email . PHP_EOL;
}

function task2($email)
{
Email::$email = $email;
echo "发邮件:" . Email::$email . PHP_EOL;

}

Swoole\Coroutine::create('task1', '[email protected]');

Swoole\Coroutine::create('task2', '[email protected]');

});

从感觉啥觉得会输出两个邮箱地址,但其实:

1
2
发邮件:[email protected]
发邮件:[email protected]

这就是变量生命周期,需要注意,我们可以封装一个类来保存上下文。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<?php
class Context
{
/**
* [
* 'cid' => [ // 就是协程的id
* 'key' => 'value' // 保存的全局变量的信息
* ]
* ]
* @var [type]
*/
public static $pool = [];

static function get($key)
{
$cid = Swoole\Coroutine::getuid();// 获取当前运行的协程id
if ($cid < 0) {
return null;
}
if (isset(self::$pool[$cid][$key])) {
return self::$pool[$cid][$key];
}
return null;
}

static function put($key, $item)
{
$cid = Swoole\Coroutine::getuid();// 获取当前运行的协程id
if ($cid > 0) {
self::$pool[$cid][$key] = $item;
}
}

static function delete($key = null)
{
$cid = Swoole\Coroutine::getuid();
if ($cid > 0) {
if ($key) {
unset(self::$pool[$cid][$key]);
} else {
unset(self::$pool[$cid]);
}
}
var_dump(self::$pool);
}
}

运行以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?php
Swoole\Coroutine::create(function () {

function task1($email)
{
Context::put('email',$email);
Swoole\Coroutine::sleep(3); // 模拟io阻塞
echo "发邮件:" . Context::get('email') . PHP_EOL;
}

function task2($email)
{
Context::put('email',$email);
echo "发邮件:" . Context::get('email') . PHP_EOL;
}

Swoole\Coroutine::create('task1', '[email protected]');

Swoole\Coroutine::create('task2', '[email protected]');

var_dump(Context::$pool);
});

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
发邮件:[email protected]
array(2) {
[2]=>
array(1) {
["email"]=>
string(19) "[email protected]"
}
[3]=>
array(1) {
["email"]=>
string(20) "[email protected]"
}
}
发邮件:[email protected]

可以看到,两个邮箱都输出成功了,但是我们的变量没有销毁,如何销毁呢,Swoole提供了defer方法,在协程关闭之前会调用defer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?php
Swoole\Coroutine::create(function () {

function task1($email)
{
Context::put('email',$email);
Swoole\Coroutine::sleep(3); // 模拟io阻塞
echo "发邮件:" . Context::get('email') . PHP_EOL;
Swoole\Coroutine::defer(function (){
Context::delete();
});
}

function task2($email)
{
Context::put('email',$email);
echo "发邮件:" . Context::get('email') . PHP_EOL;
Swoole\Coroutine::defer(function (){
Context::delete();
});
}

Swoole\Coroutine::create('task1', '[email protected]');

Swoole\Coroutine::create('task2', '[email protected]');


});

运行结果:

1
2
3
4
5
6
7
8
9
10
11
发邮件:[email protected]
array(1) {
[2]=>
array(1) {
["email"]=>
string(19) "[email protected]"
}
}
发邮件:[email protected]
array(0) {
}

可以发现到最后为空,已经被清空掉了。

是不是觉得这样写很麻烦,以及不确定在什么时候销毁,然而Swoole提供的Context可以让协程退出后上下文自动清理 (如无其它协程或全局变量引用)。

代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php
Swoole\Coroutine::create(function () {

function task1($email)
{
$context = Swoole\Coroutine::getContext();
$context->email = $email;
Swoole\Coroutine::sleep(3); // 模拟io阻塞
echo "发邮件:" . $context->email . PHP_EOL;
}

function task2($email)
{
$context = Swoole\Coroutine::getContext();
$context->email = $email;
echo "发邮件:" . $context->email . PHP_EOL;
}

Swoole\Coroutine::create('task1', '[email protected]');

Swoole\Coroutine::create('task2', '[email protected]');

});

php yield关键字及协程实现

迭代器

迭代是指反复执行一个过程,每执行一次叫做迭代一次

php提供了统一的迭代器接口,之前文章我已经写过了。
通过实现Iterator接口,可以自行决定如何遍历。

生成器

相比迭代器,生成器提供了更容易的方法来简单实现对象的迭代,性能开销和复杂性大大降低。

一个生成器函数看起来更像一个普通的函数,不同的是普通函数返回的是一个值,而生成器可以yield生成许多个值。

生成器yield关键字不是返回值,而是返回Generator对象,不能被实力化,且继承了Iterator接口。

生成器优点:

  • 生成器会对php应用的性能有非常大的影响。

  • 代码运行时,节省大量内存。

  • 适合计算大量的数据。

颠覆常识的yield

大家都知道range函数创建一个包含指定范围的元素的数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<?php

$start = 1;
$end = 99999999999;

range($start,$end); // PHP Warning: range(): The supplied range exceeds the maximum array size: start=1 end=99999999999

function xrange($start, $end)
{
$result = [];
for ($i = $start; $i <= $end; $i++) {
$result[] = $i;
}
}
xrange(1, 99999999999);// Fatal error: Allowed memory size of 134217728 bytes exhausted (tried to allocate 134217736 bytes)

以上代码,创建1-99999999999的数组,range报错,用for来创建,会内存溢出。

接下来看个好玩的!!!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<?php

$start = 1;
$end = 99999999999;


function xrange($start, $end)
{
for ($i = $start; $i < $end; $i++) {
yield $i;
}
}

$result = xrange($start, $end);

echo $result->current(); // 输出 1

$result->next();

echo $result->current(); // 输出 2

却发现能够正常输出数值!

那下面这样呢???

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<?php

$start = 1;
$end = 3;


function yrange($start, $end)
{
for ($i = $start; $i < $end; $i++) {
echo '输出1' . PHP_EOL;
yield $i;
echo '输出2' . PHP_EOL;
}
}

echo '输出这个对象!!!!' . PHP_EOL;
var_dump(yrange($start, $end));
echo '对象输出结束!!!!'.PHP_EOL.PHP_EOL;

echo '遍历一次开始!!!!'.PHP_EOL;
foreach (yrange($start, $end) as $value) {
echo '遍历的数据'.$value . PHP_EOL;
break; // 我们遍历一次 就停止循环
};
echo '遍历一次结束!!!!'.PHP_EOL.PHP_EOL;


echo '一直遍历开始!!!!'.PHP_EOL;
foreach (yrange($start, $end) as $value) {
echo '遍历的数据'.$value . PHP_EOL;
};
echo '一直遍历结束!!!!'.PHP_EOL.PHP_EOL;

来看下运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
输出这个对象!!!!
object(Generator)#1 (0) {
}
对象输出结束!!!!

遍历一次开始!!!!
输出1
遍历的数据1
遍历一次结束!!!!

一直遍历开始!!!!
输出1
遍历的数据1
输出2
输出1
遍历的数据2
输出2
一直遍历结束!!!!

是不是懵逼了!!

  • 调用函数返回,却发现for竟然没有执行。
  • 就遍历一次,发现只执行了echo '输出1' . PHP_EOL;,而且也没有循环3次。

yield就是这样,有yield的函数被称为生成器函数。

yield实现协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?php
function task1()
{
for ($i = 0; $i < 3; $i++) {
sleep(1); // 模拟耗时
echo "发短信{$i}\n";
}
}

function task2()
{
for ($i = 0; $i < 3; $i++) {
sleep(1); // 模拟阻塞
echo "发邮件{$i}\n";
}
}

task1();
task2();

以上代码可以看出,短信发完之后,才会发邮件,如果交替执行或者再添加任务应该怎么做呢。

多任务协作及调度器实现

为了实现我们的多任务调度,首先实现“任务”–一个用轻量级的包装的协程函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<?php
class Task
{
protected $taskId; // 任务id
protected $coroutine; // 生成器
protected $sendValue = null; // 生成器send值
protected $beforeFirstYield = true; // 迭代的指针是否为第一个

public function __construct($taskId, Generator $coroutine)
{
$this->taskId = $taskId;
$this->coroutine = $coroutine;
}

public function getTaskId()
{
return $this->taskId;
}

public function setSendValue($sendValue)
{
$this->sendValue = $sendValue;
}

public function run()
{
if ($this->beforeFirstYield) {
$this->beforeFirstYield = false;
return $this->coroutine->current();
} else {
$retval = $this->coroutine->send($this->sendValue);
$this->sendValue = null;
return $retval;
}
}

public function isFinished()
{
return !$this->coroutine->valid();
}
}

如代码,一个任务就是用任务ID标记的一个协程(函数)。

使用setSendValue()方法,你可以指定哪些值将被发送到下次的恢复(在之后你会了解到我们需要这个)。

run()函数确实没有做什么,除了调用send()方法的协同程序, 要理解为什么添加了一个 beforeFirstYieldflag变量, 需要考虑下面的代码片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<?php
function gen() {
yield 'foo';
yield 'bar';
}
$gen = gen();
var_dump($gen->send('something'));
// 如之前提到的在send之前, 当$gen迭代器被创建的时候一个renwind()方法已经被隐式调用
// 所以实际上发生的应该类似:
//$gen->rewind();
//var_dump($gen->send('something'));
//这样renwind的执行将会导致第一个yield被执行, 并且忽略了他的返回值.
//真正当我们调用yield的时候, 我们得到的是第二个yield的值! 导致第一个yield的值被忽略.
//string(3) "bar"

通过添加 beforeFirstYield我们可以确定第一个yield的值能被正确返回。

调度器现在不得不比多任务循环要做稍微多点了,然后才运行多任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<?php
class Scheduler {
protected $maxTaskId = 0;
protected $taskMap = []; // taskId => task
protected $taskQueue;
public function __construct() {
$this->taskQueue = new SplQueue();
}
public function newTask(Generator $coroutine) {
$tid = ++$this->maxTaskId;
$task = new Task($tid, $coroutine);
$this->taskMap[$tid] = $task;
$this->schedule($task);
return $tid;
}
public function schedule(Task $task) {
$this->taskQueue->enqueue($task);
}
public function run() {
while (!$this->taskQueue->isEmpty()) {
$task = $this->taskQueue->dequeue();
$task->run();
if ($task->isFinished()) {
unset($this->taskMap[$task->getTaskId()]);
} else {
$this->schedule($task);
}
}
}
}

newTask()方法(使用下一个空闲的任务id)创建一个新任务,然后把这个任务放入任务map数组里,接着它通过把任务放入任务队列里来实现对任务的调度,接着run()方法扫描任务队列,运行任务。

如果一个任务结束了, 那么它将从队列里删除,否则它将在队列的末尾再次被调度。

让我们看看下面具有两个简单任务的调度器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php
function task1()
{
for ($i = 0; $i < 3; $i++) {
sleep(1); // 模拟耗时
yield;
echo "发短信{$i}\n";
}
}

function task2()
{
for ($i = 0; $i < 3; $i++) {
sleep(1); // 模拟耗时
yield;
echo "发邮件{$i}\n";
}
}

$scheduler = new Scheduler;
$scheduler->newTask(task1());
$scheduler->newTask(task2());
$scheduler->run();

两个任务都仅仅回显一条信息,然后使用yield把控制回传给调度器。输出结果如下:

1
2
3
4
5
6
发短信0
发邮件0
发短信1
发邮件1
发短信2
发邮件2

输出确实如我们所期望的:对前五个迭代来说,两个任务是交替运行的。

调度器通信

上面实现了协程封装,但是调度器和任务直接缺少了通信,进行重新封装,使协程当中能够获取当前的任务id,新增任务,以及杀死任务。

系统调用

先封装系统调用:

1
2
3
4
5
6
7
8
9
10
11
<?php
class SystemCall {
protected $callback;
public function __construct(callable $callback) {
$this->callback = $callback;
}
public function __invoke(Task $task, Scheduler $scheduler) {
$callback = $this->callback;
return $callback($task, $scheduler);
}
}

它和其他任何可调用的对象(使用_invoke)一样的运行, 不过它要求调度器把正在调用的任务和自身传递给这个函数。
为了解决这个问题我们不得不微微的修改调度器的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<?php
public function run() {
while (!$this->taskQueue->isEmpty()) {
$task = $this->taskQueue->dequeue();
$retval = $task->run();
if ($retval instanceof SystemCall) {
$retval($task, $this);
continue;
}
if ($task->isFinished()) {
unset($this->taskMap[$task->getTaskId()]);
} else {
$this->schedule($task);
}
}
}

获取任务

编写获取任务id函数:

1
2
3
4
5
6
7
<?php
function getTaskId() {
return new SystemCall(function(Task $task, Scheduler $scheduler) {
$task->setSendValue($task->getTaskId());
$scheduler->schedule($task);
});
}

重新编写我们的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?php
function task1()
{
$tid = (yield getTaskId()); // <-- here's the syscall!
for ($i = 0; $i < 3; $i++) {
sleep(1); // 模拟耗时
yield;
echo "发短信{$i}\n";
}
}

function task2()
{
$tid = (yield getTaskId()); // <-- here's the syscall!
for ($i = 0; $i < 3; $i++) {
sleep(1); // 模拟耗时
yield;
echo "发邮件{$i}\n";
}
}

$scheduler = new Scheduler;
$scheduler->newTask(task1());
$scheduler->newTask(task2());
$scheduler->run();

这段代码将给出与前一个例子相同的输出。请注意系统调用如何同其他任何调用一样正常地运行,只不过预先增加了yield。

新增任务

编写新增任务函数:

1
2
3
4
5
6
7
8
9
<?php
function newTask(Generator $coroutine) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($coroutine) {
$task->setSendValue($scheduler->newTask($coroutine));
$scheduler->schedule($task);
}
);
}

杀死任务

编写杀死任务函数:

1
2
3
4
5
6
7
8
9
<?php
function killTask($tid) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($tid) {
$task->setSendValue($scheduler->killTask($tid));
$scheduler->schedule($task);
}
);
}

同样我们也需要往调度器里面,增加一个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<?php
public function killTask($tid) {
if (!isset($this->taskMap[$tid])) {
return false;
}
unset($this->taskMap[$tid]);
// This is a bit ugly and could be optimized so it does not have to walk the queue,
// but assuming that killing tasks is rather rare I won't bother with it now
foreach ($this->taskQueue as $i => $task) {
if ($task->getTaskId() === $tid) {
unset($this->taskQueue[$i]);
break;
}
}
echo "任务 $tid 被杀死\n";
return true;
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<?php
function childTask()
{
$tid = (yield getTaskId());
while (true) {
echo "任务 $tid 执行\n";
yield;
}
}

function task1()
{
$tid = (yield getTaskId()); // <-- here's the syscall!
for ($i = 0; $i < 3; $i++) {
echo "发短信{$i}\n";
yield;
}
}

function task2()
{
$tid = (yield getTaskId()); // <-- here's the syscall!
$childTId = (yield newTask(childTask()));
for ($i = 0; $i < 3; $i++) {
echo "发邮件{$i}\n";
yield;
if ($i == 2) {
yield killTask($childTId);
}
}
}

$scheduler = new Scheduler;
$scheduler->newTask(task1());
$scheduler->newTask(task2());
$scheduler->run();

swoole实现协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?php
Swoole\Coroutine::create(function (){
$start = time();
$wait = new Swoole\Coroutine\WaitGroup();
function task1()
{
for ($i = 0; $i < 3; $i++) {
echo "发短信{$i}\n";
Swoole\Coroutine::sleep(1); // 协程切换
}
}

function task2()
{
for ($i = 0; $i < 3; $i++) {
echo "发邮件{$i}\n";
Swoole\Coroutine::sleep(1); // 协程切换
}
}
$wait->add();
Swoole\Coroutine::create('task1');
$wait->add();
Swoole\Coroutine::create('task2');
$wait->wait();
echo '耗时' . (time() - $start);
});

以上代码可看出,耗时为6s,但运行结果确实3s,这就体现了协程的好处。

下一篇文章会具体写出swoole协程的用法。

以上yield实现协程 部分内容来自于 https://www.laruence.com/2015/05/28/3038.html


sl-im 基于 Swoft 微服务协程框架和 Layim 网页聊天系统 开发出来的聊天室

简介

sl-im 是基于 Swoft 微服务协程框架和 Layim 网页聊天系统 所开发出来的聊天室。

体验地址

sl-im https://im.stitch.cn

演示图

sl-im

功能

  • 登录注册(Http)
  • 单点登录(Websocket)
  • 私聊(Websocket)
  • 群聊(Websocket)
  • 在线人数(Websocket)
  • 获取未读消息(Websocket)
  • 好友在线状态(Websocket)
  • 好友 查找 添加 同意 拒绝(Http+Websocket)
  • 群 创建 查找 添加 同意 拒绝(Http+Websocket)
  • 聊天记录存储
  • 心跳检测
  • 消息重发
  • 断线重连

Requirement

部署方式

Composer

1
composer update

bean

app/bean.php

1
2
3
4
5
6
7
8
9
10
11
12
13
'db' => [
'class' => Database::class,
'dsn' => 'mysql:dbname=im;host=127.0.0.1:3306',
'username' => 'root',
'password' => 'gaobinzhan',
'charset' => 'utf8mb4',
],
'db.pool' => [
'class' => \Swoft\Db\Pool::class,
'database' => bean('db'),
'minActive' => 5, // 自己调下连接池大小
'maxActive' => 10
],

数据表迁移

php bin/swoft mig:up

env配置

vim .env

1
2
3
4
5
6
7
8
9
10
11
# basic
APP_DEBUG=0
SWOFT_DEBUG=0

# more ...
APP_HOST=https://im.stitch.cn/
WS_URL=ws://im.stitch.cn/im
# 是否开启静态处理 这里我关了 让nginx去处理
ENABLE_STATIC_HANDLER=false
# swoole v4.4.0以下版本, 此处必须为绝对路径
DOCUMENT_ROOT=/data/wwwroot/IM/public

nginx配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
server{
listen 80;
server_name im.stitch.cn;
return 301 https://$server_name$request_uri;
}

server{
listen 443 ssl;
root /data/wwwroot/IM/public/;
add_header Strict-Transport-Security "max-age=31536000";
server_name im.stitch.cn;
access_log /data/wwwlog/im-stitch.cn.access.log;
error_log /data/wwwlog/im-stitch.cn.error.log;
client_max_body_size 100m;
ssl_certificate /etc/nginx/ssl/full_chain.pem;
ssl_certificate_key /etc/nginx/ssl/private.key;
ssl_session_timeout 5m;
ssl_protocols TLSv1.1 TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:HIGH:!aNULL:!MD5:!RC4:!DHE;
location / {
proxy_pass http://127.0.0.1:9091;
proxy_set_header Host $host:$server_port;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Real-PORT $remote_port;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
location /im {
proxy_pass http://127.0.0.1:9091;
proxy_http_version 1.1;
proxy_read_timeout 3600s;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
location ~ .*\.(js|ico|css|ttf|woff|woff2|png|jpg|jpeg|svg|gif|htm)$ {
root /data/wwwroot/IM/public;
}
}

Start

  • 挂起
1
php bin/swoft ws:start
  • 守护进程化
1
php bin/swoft ws:start -d
  • 访问

怎么访问还用写吗???点个star吧 ✌️

联系方式

  • QQ:975975398

微服务架构常见的分布式事务解决方案

场景问题

多个服务,位于不同主机,不同的网络当中,没有办法用本地事务保证要么一起成功,要么一起失败。

BASE理论

BA: Basic Availability 基本业务可用性(支持分区失败)

S: Soft state 柔性状态(状态允许有短时间不同步,异步)

E: Eventual consistency 最终一致性(最终数据是一致的,但不是实时一致)

原子性(A)与持久性(D)必须根本保障

为了可用性、性能与降级服务的需要,只有降低一致性( C ) 与 隔离性( I ) 的要求

酸碱平衡(ACID-BASE Balance)

CAP定理

对于共享数据系统,最多只能同时拥有CAP其中的两个,没法三者兼顾。

任意两者的组合都有其适用场景

真实系统应当是ACID与BASE的混合体

不同类型的业务可以也应当区别对待

CAP定理

解决方案

可靠消息最终一致性(异步确保型)

实现

业务处理服务在业务事务提交前,向实时消息服务请求发送消息,实时消息服务只记录消息数据,而不真正发送。业务处理服务在业务事务提交后,向实时消息服务确认发送。只有在得到确认发送指令后,实时消息服务才真正发送

消息

业务处理服务在业务事务回滚后,向实时消息服务取消发送。消息状态确认系统定期找到未确认发送或回滚发送的消息,向业务处理服务询问消息状态,业务处理服务根据消息ID或消息内容确定该消息是否有效

约束

被动方的处理结果不影响主动方的处理结果,被动方的消息处理操作是幂等操作

成本

可靠消息系统建设成本

一次消息发送需要两次请求,业务处理服务需实现消息状态回查接口

优点、适用范围

消息数据独立存储、独立伸缩,降低业务系统与消息系统间的耦合

方案特点

兼容所有实现AMQP标准的MQ中间件

确保业务数据可靠的前提下,实现业务数据的最终一致(理想状态下基本是准实时一致)

柔性事务解决方案 TCC(两阶段型、补偿型)

实现

一个完整的业务活动由一个主业务服务与若干从业务服务组成

主业务服务负责发起并完成整个业务活动

从业务服务提供TCC型业务操作

业务活动管理器控制业务活动的一致性,它登记业务活动中的操作,并在业务活动提交时确认所有的TCC型操作的confirm操作,在业务活动取消时调用所有TCC型操作的cancel操作

成本

实现TCC操作的成本

业务活动结束时confirm或cancel操作的执行成本

业务活动日志成本

适用范围

强隔离性、严格一致性要求的业务活动

适用于执行时间较短的业务(比如处理账户、收费等业务)

用到的服务模式

TCC操作、幂等操作、可补偿操作、可查询操作

方案特点

不与具体的服务框架耦合(在RPC架构中通用)

位于业务服务层,而非资源层

可以灵活选择业务资源的锁定粒度

最大努力通知型

实现

业务活动的主动方,在完成业务处理之后,向业务活动的被动方发送消息,允许消息丢失。

业务活动的被动方根据定时策略,向业务活动主动方查询,恢复丢失的业务消息。

约束

被动方的处理结果不影响主动方的处理结果

成本

业务查询与校对系统的建设成本

使用范围

对业务最终一致性的时间敏感度低

跨企业的业务活动

方案特点

业务活动的主动方在完成业务处理后,向业务活动被动方发送通知消息(允许消息丢失)主动方可以设置时间阶梯型通知规则,在通知失败后按规则重复通知,直到通知N次后不主动方提供校对查询接口给被动方按需校对查询,用于恢复丢失的业务消息

应用案例

银行通知、商户通知等(各大交易业务平台间的商户通知:多次通知、查询校对、对账文件)


Swoole协程模式实现Mysql连接池

连接池定义

永不断开,要求我们的这个程序是一个常驻内存的程序。数据库连接池(Connection pooling)是程序启 动时建立足够的数据库连接,并将这些连接组成一个连接池,由程序动态地对池中的连接进行申请,使用,释放。

为什么需要连接池?

当并发量很低的时候,连接可以临时建立,但当服务吞吐达到几百、几千的时候,频繁 建立连接 Connect销毁连接 Close 就有可能会成为服务的一个瓶颈,那么当服务启动的时候,先建立好若干个连接并存放于一个队列中,当需要使用时从队列中取出一个并使用,使用完后再反还到队列去,而对这个队列数据结构进行维护的,就是连接池。

使用channel实现连接池

必须在协程模式下

Pool.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
<?php


class PdoPool
{
/**
* @var Swoole\Coroutine\Channel
*/
protected $channel;

/**
* @var int 最大连接数
*/
protected $maxActive = 30;

/**
* @var int 最少连接数
*/
protected $minActive = 10;

/**
* @var int 最大等待连接数
*/
protected $maxWait = 200;

/**
* @var float 最大等待时间 -1 永不超时
*/
protected $maxWaitTime = 1.2;

/**
* @var int 最大空闲时间s
*/
protected $maxIdleTime = 5;

/**
* @var int 自动检查时间ms
*/
protected $checkTime = 3000;

/**
* @var int 当前连接数
*/
protected $count = 0;

/**
* @var self
*/
protected static $instance = null;


/**
* PdoPool constructor.
*/
private function __construct()
{
// 初始化连接池
$this->init();
// 定时器进行空闲连接释放
$this->recovery();
}

/**
* @return PdoPool|null
*/
public static function getInstance()
{
if (!self::$instance instanceof self) {
self::$instance = new self();
}
return self::$instance;
}

/**
* 连接池初始化
*/
protected function init()
{
for ($i = 0; $i < $this->minActive; $i++) {
$connection = $this->getConnection();
if ($connection) $this->channel->push($connection);
}
}

public function getConnection(){
return $this->getConnectionByChannel();
}

private function getConnectionByChannel()
{
// 创建Channel
if ($this->channel === null) {
$this->channel = new Swoole\Coroutine\Channel($this->maxActive);
}

// 小于连接池内最小连接数
if ($this->count < $this->minActive) {
return $this->createConnection();
}

// 取出连接
$connection = null;
if (!$this->channel->isEmpty()) {
$connection = $this->popConnection();
}

//检测连接是否正常
if ($connection !== null && $connection['connection'] instanceof PDO) {
return $connection;
}

//未取出连接 判断是否大于最大连接数进行创建
if ($this->count < $this->maxActive) {
return $this->createConnection();
}


//查看协程挂起数
$stats = $this->channel->stats();
if ($this->maxWait > 0 && $stats['consumer_num'] >= $this->maxWait) {
echo '协程挂起数已大于最大等待数' . PHP_EOL;
}

//重新取出连接
$connection = $this->channel->pop($this->maxWaitTime);
if ($connection == false) {
echo '获取连接失败' . PHP_EOL;
}
return $connection;
}

private function createConnection()
{
// 因为堵塞问题 会造成当前连接数大于最大连接数 先进行++
$this->count++;
try {
$connection = new PDO('mysql:host=localhost;dbname=test', 'root', 'gaobinzhan');
$connection->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
return [
'connection' => $connection,
'lastUsedTime' => time()
];
} catch (\Throwable $throwable) {
//失败--
$this->count--;
}
}


private function popConnection()
{
while (!$this->channel->isEmpty()) {
$connection = $this->channel->pop();
return $connection;
}
return null;
}

public function freeConnection($connection)
{
//放入连接池
$stats = $this->channel->stats();
if ($stats['queue_num'] < $this->maxActive) {
$connection['lastUsedTime'] = time();
$this->channel->push($connection);
}
}

/**
* 自动回收空闲连接
*/
private function recovery()
{
swoole_timer_tick($this->checkTime, function () {
while ($this->count > $this->minActive && !$this->channel->isEmpty()) {
$connection = $this->channel->pop($this->maxWaitTime);
if (!$connection) {
continue;
}
if ((time() - $connection['lastUsedTime']) > $this->maxIdleTime) {
$this->count--;
$connection['connection'] = null;
echo "回收成功" . PHP_EOL;
} else {
$this->channel->push($connection);
}
}
});
}

private function __clone()
{

}

}

这里生成的是Pdo连接池同理可自行修改createConnection方法生成其它连接池

可用AST语法树进行多种连接池配置

大佬勿喷 嘿嘿!


php用select实现I/O复用

前言

在Linux Socket服务器短编程时,为了处理大量客户的连接请求,需要使用非阻塞I/O和复用,select、poll和epoll是Linux API提供的I/O复用方式,其实I/O多路复用就是通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的。现在比较受欢迎的的nginx就是使用epoll来实现I/O复用支持高并发,所以理解好select,poll,epoll对于nginx如何应对高并发还是很有帮助的。

select调用过程

select

select缺点

  1. 单个进程监控的文件描述符有限,通常为1024*8个文件描述符。

    当然可以改进,由于select采用轮询方式扫描文件描述符。文件描述符数量越多,性能越差。

  2. 内核/用户数据拷贝频繁,操作复杂。

    select在调用之前,需要手动在应用程序里将要监控的文件描述符添加到fed_set集合中。然后加载到内核进行监控。用户为了检测时间是否发生,还需要在用户程序手动维护一个数组,存储监控文件描述符。当内核事件发生,在将fed_set集合中没有发生的文件描述符清空,然后拷贝到用户区,和数组中的文件描述符进行比对。再调用selecct也是如此。每次调用,都需要了来回拷贝。

  3. 轮回时间效率低

    select返回的是整个数组的句柄。应用程序需要遍历整个数组才知道谁发生了变化。轮询代价大。

  4. select是水平触发

    应用程序如果没有完成对一个已经就绪的文件描述符进行IO操作。那么之后select调用还是会将这些文件描述符返回,通知进程。

代码实现

Worker.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
<?php
/**
* @author gaobinzhan <[email protected]>
*/

class Worker
{

private $socket;

public $onReceive;

public $onConnect;

public $onClose;

private $socketList = [];

private $events = [
'connect' => 'onConnect',
'receive' => 'onReceive',
'close' => 'onClose'
];

public function __construct($host, $port, $type)
{
$this->socket = stream_socket_server("{$type}://{$host}:{$port}");
stream_set_blocking($this->socket,0); // Set non blocking
$this->socketList[(int)$this->socket] = $this->socket;
return $this->socket;
}


private function accept()
{
while (true) {
$write = $except = [];
$read = $this->socketList;
stream_select($read,$write,$except,60);
foreach ($read as $socket) $socket === $this->socket ? $this->createSocket() : $this->receive($socket);
}
}

private function createSocket(){
//Establish a connection with the client
$client = stream_socket_accept($this->socket);
(!empty($client && is_callable($this->onConnect))) && call_user_func_array($this->onConnect, [$this->socket, $client]);
$this->socketList[(int)$client] = $client;
}

private function receive($client){

$buffer = fread($client, 65535);
if (empty($buffer) && (feof($client) || !is_resource($client))) { fclose($client); unset($this->socketList[(int)$client]); }
!empty($buffer) && is_callable($this->onReceive) && call_user_func_array($this->onReceive, [$this->socket, $client, $buffer]);


//because:IO Multiplexing
/*$close = fclose($v);
if (!empty($close) && is_callable($this->onClose)) call_user_func_array($this->onClose, [$v]);*/
}

public function send($client, $data)
{
$response = "HTTP/1.1 200 OK\r\n";
$response .= "Content-Type: text/html;charset=UTF-8\r\n";
$response .= "Connection: keep-alive\r\n";
$response .= "Content-length: ".strlen($data)."\r\n\r\n";
$response .= $data;
fwrite($client, $response);
}

public function on($event, $callback)
{
$event = $this->events[$event] ?? null;
$this->$event = $callback;
}

public function start()
{
$this->accept();
}
}

server.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<?php
/**
* @author gaobinzhan <[email protected]>
*/

require 'Worker.php';


class Server{

public $server;

public function __construct($host, $port, $type)
{
$this->server = new Worker($host, $port, $type);
$this->server->on('connect',[$this,"onConnect"]);
$this->server->on('receive',[$this,"onReceive"]);
$this->server->on('close',[$this,"onClose"]);

$this->server->start();
}

public function onConnect($server,$fd){
echo 'onConnect '.$fd.PHP_EOL;
}

public function onReceive($server,$fd,$data){
echo 'onReceive '.$fd.PHP_EOL;
$this->server->send($fd,'this is server !!!');
}

public function onClose($fd){
echo 'onClose '.$fd.PHP_EOL;
}
}
new Server('0.0.0.0','9501','tcp');

cli模式下运行server.php

浏览器第一次访问http://127.0.0.1:9501

发现第一次连接符为7

第一次

第二访问 发现为7的连接符被复用了

第二次

可以用ab测试工具 更能体现出io复用

ab -c 100 -n 100000 -k http://127.0.0.1:9501/


Linux五大网络IO模型图解

对于一个应用程序即一个操作系统进程来说,它既有内核空间(与其他进程共享),也有用户空间(进程私有),它们都是处于虚拟地址空间中。用户进程是无法访问内核空间的,它只能访问用户空间,通过用户空间去内核空间复制数据,然后进行处理。

阻塞io(同步io)

发起请求就一直等待,直到数据返回。好比你去商场试衣间,里面有人,那你就一直在门外等着。(全程阻塞)

阻塞io(同步io).png

非阻塞io(同步io)

不管有没有数据都返回,没有就隔一段时间再来请求,如此循环。好比你要喝水,水还没烧开,你就隔段时间去看一下饮水机,直到水烧开为止。(复制数据时阻塞)

非阻塞io(同步io).png

io复用(同步io)

I/O是指网络I/O,多路指多个TCP连接(即socket或者channel),复用指复用一个或几个线程。意思说一个或一组线程处理多个连接。比如课堂上学生做完了作业就举手,老师就下去检查作业。(对一个IO端口,两次调用,两次返回,比阻塞IO并没有什么优越性;关键是能实现同时对多个IO端口进行监听,可以同时对多个读/写操作的IO函数进行轮询检测,直到有数据可读或可写时,才真正调用IO操作函数。) 
io复用(同步io).png

信号驱动io(同步io)

事先发出一个请求,当有数据后会返回一个标识回调,这时你可以去请求数据。好比银行排号,当叫到你的时候,你就可以去处理业务了(复制数据时阻塞)。
信号驱动io(同步io).png

异步io

发出请求就返回,剩下的事情会异步自动完成,不需要做任何处理。好比有事秘书干,自己啥也不用管。  
异步io.png

总结

五种IO的模型:阻塞IO、非阻塞IO、多路复用IO、信号驱动IO和异步IO;前四种都是同步IO,在内核数据copy到用户空间时都是阻塞的。

阻塞IO和非阻塞IO的区别在于第一步,发起IO请求是否会被阻塞,如果会那就是传统的阻塞IO,如果不会那就是非阻塞IO。

同步IO和异步IO的区别就在于第二个步骤是否阻塞,如果实际的IO读写阻塞请求进程,那么就是同步IO;如果不阻塞,而是操作系统帮你做完IO操作再将结果返回给你,那么就是异步IO


docker实现redis主从复制

主从复制说明

面临问题

在实际的场景当中单一节点的redis容易面临风险。
比如:

  • 机器故障。我们部署到一台 Redis 服务器,当发生机器故障时,需要迁移到另外一台服务器并且要保证数据是同步的。而数据是最重要的,如果你不在乎, 基本上也就不会使用 Redis 了。

要实现分布式数据库的更大的存储容量和承受高并发访问量,我们会将原来集中式数据库的数据分别存储到其他多个网络节点上。

Redis 为了解决这个单一节点的问题,也会把数据复制多个副本部署到其他节点上进行复制,实现 Redis的高可用,实现对数据的冗余备份,从而保证数据和服务 的高可用。

什么是主从复制

  • 主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(master),后者称为从节点(slave),数据的复制是单向的,只能由主节点到 从节点。
  • 默认情况下,每台Redis服务器都是主节点,且一个主节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点。

主从复制的作用

  • 数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。
  • 故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的冗余。
  • 负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写Redis数据时应用连接主节点,读Redis数据时应用连接从节点) 分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大提高Redis服务器的并发量。
  • 读写分离:可以用于实现读写分离,主库写、从库读,读写分离不仅可以提高服务器的负载能力,同时可根据需求的变化,改变从库的数量;
  • 高可用基石:除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis高可用的基础。

主从复制启用

从节点开启主从复制,有3种方式:

  1. 配置文件
    在从服务器的配置文件中加入:slaveof
    不推荐使用 配置文件可被动态修改
  2. 启动命令
    redis-server启动命令后加入 –slaveof
  3. 客户端命令
    Redis服务器启动后,直接通过客户端执行命令:slaveof ,则该Redis实例成为从节点。
    通过 info replication 命令可以看到复制的一些参数信息

主从复制原理

主从复制的原理以及过程必须要掌握,这样我们才知道为什么会出现这些问题 主从复制过程大体可以分为3个阶段:连接建立阶段(即准备阶段)、数据同步阶段、命令传播阶段。
在从节点执行 slaveof 命令后,复制过程便开始运作,下面图示大概可以看到, 从图中可以看出复制过程大致分为6个过程

构建

dockerfile构建redis镜像

DockerFile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
FROM alpine
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories \
&& apk add gcc g++ libc-dev wget vim openssl-dev make linux-headers \
&& rm -rf /var/cache/apk/*

#通过选择更小的镜像,删除不必要文件清理不必要的安装缓存,从而瘦身镜像
#创建相关目录能够看到日志信息跟数据跟配置文件
RUN mkdir -p /usr/src/redis \
&& mkdir -p /usr/src/redis/data \
&& mkdir -p /usr/src/redis/conf \
&& mkdir -p /usr/src/redis/log \
&& mkdir -p /var/log/redis


RUN wget -O /usr/src/redis/redis-4.0.11.tar.gz "http://download.redis.io/releases/redis-4.0.11.tar.gz" \
&& tar -xzf /usr/src/redis/redis-4.0.11.tar.gz -C /usr/src/redis \
&& rm -rf /usr/src/redis/redis-4.0.11.tar.tgz

RUN cd /usr/src/redis/redis-4.0.11 && make && make PREFIX=/usr/local/redis install \
&& ln -s /usr/local/redis/bin/* /usr/local/bin/ && rm -rf /usr/src/redis/redis-4.0.11

#COPY ./conf/redis.conf /usr/src/redis/conf

CMD ["/usr/local/bin/redis-server","/usr/src/redis/conf/redis.conf"]

切换到当前dockfile文件目录下 执行命令

docker build -t redis .

等待构建完成就可以了

docker创建自定义网络及redis主从集群规划

执行自定义网络命令

docker network create --subnet=192.168.1.0/24 redis-network

容器名称 容器IP地址 映射端口号 宿主机IP地址 服务运行模式
redis-master 192.168.1.2 6380->6379 127.0.0.1 master
redis-slave 192.168.1.3 6381->6379 127.0.0.1 slave

docker启动容器

在 /data/ 下面创建该目录结构

data和log下面文件忽略

目录代码

提取码:r8r1

下载完放到 /data 下面

用户可自行定义宿主机目录

master
docker run -itd --name redis-master --net redis-network -v /data/redis/master:/usr/src/redis -p 6380:6379 --ip 192.168.1.2 redis

slave
docker run -itd --name redis-slave --net redis-network -v /data/redis/slave:/usr/src/redis -p 6381:6379 --ip 192.168.1.3 redis

测试主从复制

主从的配置文件 没有设置redis密码

开两个终端分别执行

docker exec -it redis-master sh

docker exec -it redis-slave sh

进入容器后执行(都要执行)

redis-cli

在slave执行

SLAVEOF 192.168.1.2 6379

info repliaction

master_link_status为up就成功了!

当前是通过内网连接 端口号为6379

如果通过宿主机IP连接 端口号为6380

这时候在master进行操作 就可以看到slave的变化了!