1. citus介绍

citus是PostgreSQL数据库中的一种轻量级的分库分表解决方案。citus不是一个单独的程序,它是PostgreSQL数据库中的一个插件,可以使用create extension安装此插件。

每个citus集群有多个PostgreSQL数据库实例组成,数据库实例分为两类:

  • master节点,通常有一台。master节点只存储分库分表的元数据,不存储实际的数据。
  • worker节点,通常有多台。worker节点存储实际的分片数据(shard)。

用户只连接master节点,即用户把SQL发到master节点,然后master节点解析SQL,把SQL分解成各个子SQL发送到底层的worker节点,完成SQL的处理。

例如用户发一条如下的SQL到master节点:

  1. SELECT reponse_type, avg(reponse_time) as responsetime_avg
  2. FROM events
  3. WHERE reponse_timestamp > '2013-11-01 23:20:00'
  4. AND reponse_timestamp < '2013-11-01 23:30:00'
  5. GROUP BY response_type
  6. ORDER BY reponsetime_avg DESC;

master节点会把这条SQL分解成如下的子SQL发到底层的各个worker上:

  1. SELECT reponse_type, sum(reponse_time), count(response_time)
  2. FROM events_XXX
  3. WHERE reponse_timestamp > '2013-11-01 23:20:00'
  4. AND reponse_timestamp < '2013-11-01 23:30:00'
  5. GROUP BY response_type
  6. ORDER BY reponsetime_avg DESC;

上面的SQL中的“events_XXX”中的“XXX”代表分片号。当各个workder把数据返给master之后,master再做一次聚合运算,然后把结果返回用户。

一些更多的信息可以见citus的官方网站为:https://www.citusdata.com/

2. 安装citus

2.1 从二进制包安装

citus是一个插件,在ubuntu下只需要使用apt-get安装即可:

  1. aptitude install postgresql-9.6 postgresql-9.6-citus

装完之后,可以检查一下安装上的包:

  1. root@vds01:~# dpkg -l |grep postgresql-9.6
  2. ii postgresql-9.6 9.6.2-1.pgdg14.04+1 amd64 object-relational SQL database, version 9.6 server
  3. ii postgresql-9.6-citus 6.0.1.PGDG-1.pgdg14.04+1 amd64 sharding and distributed joins for PostgreSQL

2.2 编译安装

如果只是编译安装citus,而PostgreSQL数据库软件是通过操作系统包安装的,这时需要安装PostgreSQL开发包:

  1. aptitude install postgresql-server-dev-9.6

然后从github下载源代码:

  1. git clone https://github.com/citusdata/citus.git

编译源代码:

  1. cd citus
  2. ./configure
  3. make

在make时会报错:

  1. /usr/include/postgresql/9.6/server/libpq/libpq-be.h:36:27: fatal error: gssapi/gssapi.h: No such file or directory
  2. #include <gssapi/gssapi.h>
  3. ^
  4. compilation terminated.
  5. make[1]: *** [commands/transmit.o] Error 1
  6. make[1]: Leaving directory `/data/citus/src/backend/distributed'
  7. make: *** [extension] Error 2

安装libkrb5-dev包解决上面的问题:

  1. aptitude install libkrb5-dev

然后再编译就没有问题了。

3. 创建citus集群

3.1 集群规划

因为本次安装只是做一个演示,所以我们把所有的数据库实例都建在一台机器上。本次创建三个实例:

实例名称 操作系统用户名 数据目录 端口
实例01 pg01 /data/pg01 5432
实例02 pg02 /data/pg02 9702
实例03 pg03 /data/pg03 9703

3.2 建操作系统用户

  1. groupadd -g 501 pg01
  2. useradd -g pg01 -m -s /bin/bash -u 501 pg01
  3. groupadd -g 502 pg02
  4. useradd -g pg02 -m -s /bin/bash -u 502 pg02
  5. groupadd -g 503 pg03
  6. useradd -g pg03 -m -s /bin/bash -u 503 pg03

在pg01用户的.profile文件中添加如下内容:

  1. export PATH=/usr/lib/postgresql/9.6/bin:$PATH
  2. export LD_LIBRARY_PATH=/usr/lib/postgresql/9.6/bin:$LD_LIBRARY_PATH
  3. export PGHOST=/tmp
  4. export PGDATA=/data/pg01
  5. export PGPORT=5432

在pg02用户的.profile文件中添加如下内容:

  1. export PATH=/usr/lib/postgresql/9.6/bin:$PATH
  2. export LD_LIBRARY_PATH=/usr/lib/postgresql/9.6/bin:$LD_LIBRARY_PATH
  3. export PGHOST=/tmp
  4. export PGDATA=/data/pg02
  5. export PGPORT=9702

在pg03用户的.profile文件中添加如下内容:

  1. export PATH=/usr/lib/postgresql/9.6/bin:$PATH
  2. export LD_LIBRARY_PATH=/usr/lib/postgresql/9.6/bin:$LD_LIBRARY_PATH
  3. export PGHOST=/tmp
  4. export PGDATA=/data/pg03
  5. export PGPORT=9703

3.3 建数据库实例

分别在三个操作系统用户pg01、pg02、pg03下建三个数据库实例。

建的方法为:

  1. initdb

然后分别修改各个实例下的postgresql.conf配置文件。

其中pg01需要修改的配置项如下:

  1. listen_addresses = '*'
  2. port = 5432
  3. unix_socket_directories = '/tmp'
  4. shared_preload_libraries = 'citus'
  5. logging_collector = on

其中pg02需要修改的配置项如下:

  1. listen_addresses = '*'
  2. port = 9702
  3. unix_socket_directories = '/tmp'
  4. shared_preload_libraries = 'citus'
  5. logging_collector = on

其中pg03需要修改的配置项如下:

  1. listen_addresses = '*'
  2. port = 9703
  3. unix_socket_directories = '/tmp'
  4. shared_preload_libraries = 'citus'
  5. logging_collector = on

注意上面配置项中的“shared_preload_libraries = ‘citus’”,这一行就是为了装载citus插件。

创建完成后,就可以启动这三个数据库实例了:

启动pg01实例,在root用户下:

  1. su - pg01
  2. pg_ctl start

启动pg02实例,在root用户下:

  1. su - pg02
  2. pg_ctl start

启动pg03实例,在root用户下:

  1. su - pg03
  2. pg_ctl start

3.4 配置citus

在pg01、pg02、pg03这三个实例中分别执行下面的命令创建citus extension:

  1. create extension citus;

为了方便后续的操作,创建一个用户citusr,以后就会用citusr用户测试citus的功能:

  1. create user citusr superuser;

上面是为了方便,所以把用户创建成超级用户。实际上也可以创建成普通用户。

然后在pg01即master节点上,添加两个worker节点(即pg02和pg03):

  1. SELECT * from master_add_node('localhost', 9702);
  2. SELECT * from master_add_node('localhost', 9702);

增加完之后可以用下面的命令看是否增加成功:

  1. postgres=# SELECT * FROM master_get_active_worker_nodes();
  2. node_name | node_port
  3. -----------+-----------
  4. localhost | 9703
  5. localhost | 9702
  6. (2 rows)

做完以上操作之后,就可以创建表以及试用citus的功能了。

3.5 创建表以及测试citus的功能

在citus中有两类表:

  • Distributed Tables:即分片表。表的内容通过hash分在各个worker节点中
  • Reference Tables:即广播表,在每个分片中都复制一份。

下面我们创建两个分片表,注意这时使用我们之前建的用户citusr连接pg01:

  1. pg01@vds01:~$ psql -Ucitusr postgres
  2. psql (9.6.2)
  3. Type "help" for help.
  4. postgres=#

先按照与单机PostgreSQL数据库相同的方式建这两张表:

  1. create table t01(id int, id2 int, t text);
  2. create table t02(id int, id2 int, t text);

然后用下面的语句把这两张表定义为分片表:

  1. select create_distributed_table('t01', 'id2');
  2. select create_distributed_table('t02', 'id2', colocate_with=>'t01');

用下面的语句造一些临时数据,发现不支持:

  1. postgres=# insert into t01 select id, id, lpad(id::text, 5, id::text) from generate_series(1,100) as t(id);
  2. ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
  3. DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column.
  4. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.

从上面知道citus不支持insert 语句后面的SELECT另一张表的这种语法。

不过我们可以通过下面的方法来造数据:

  1. copy (select id, id, lpad(id::text, 5, id::text) from generate_series(1,10000) as t(id)) to '/tmp/t01.txt';
  2. copy t01 from '/tmp/t01.txt';
  3. copy t02 from '/tmp/t01.txt';

从上面知道citus是支持copy命令的。

我们使用explain查看执行计划:

  1. postgres=# explain select a.t, b.t from t01 a, t02 b where a.id2=b.id2 and a.id2=5;
  2. QUERY PLAN
  3. -------------------------------------------------------------------------------
  4. Distributed Query
  5. Executor: Router
  6. Task Count: 1
  7. Tasks Shown: All
  8. -> Task
  9. Node: host=localhost port=9702 dbname=postgres
  10. -> Nested Loop (cost=0.00..11.61 rows=1 width=12)
  11. -> Seq Scan on t01_102050 a (cost=0.00..5.80 rows=1 width=10)
  12. Filter: (id2 = 5)
  13. -> Seq Scan on t02_102082 b (cost=0.00..5.80 rows=1 width=10)
  14. Filter: (id2 = 5)
  15. (11 rows)

发现上面的执行计划与原生的PostgreSQL数据库有所不同。

如果我们join时不使用分布键,会发生什么? 试一试:

  1. postgres=# explain select a.t, b.t from t01 a, t02 b where a.id=b.id2 and a.id2=5;
  2. ERROR: cannot use real time executor with repartition jobs
  3. HINT: Set citus.task_executor_type to "task-tracker".

提示在“real time”执行器下不支持跨节点join,改成“task-tracker”可以支持,试一试:

  1. postgres=# Set citus.task_executor_type to "task-tracker"; SET
  2. postgres=# select a.t, b.t from t01 a, t02 b where a.id=b.id2 and a.id2=5;
  3. t | t
  4. -------+-------
  5. 55555 | 55555
  6. (1 row)
  7. Time: 9843.828 ms

从上面可以看出,可以执行,就是有些慢。

我们建两张广播表:

  1. create table t03(id int, id2 int, t text);
  2. create table t04(id int, id2 int, t text);
  3. select create_reference_table('t03');
  4. select create_reference_table('t04');
  1. postgres=# copy t03 from '/tmp/t01.txt';
  2. COPY 10000
  3. Time: 34.356 ms
  4. postgres=# copy t04 from '/tmp/t01.txt';
  5. COPY 10000
  6. Time: 30.163 ms

两张广播表join一下:

  1. postgres=# select a.t, b.t from t03 a, t04 b where a.id=b.id2 and a.id2=5;
  2. t | t
  3. -------+-------
  4. 55555 | 55555
  5. (1 row)
  6. Time: 3.432 ms
  7. postgres=# explain select a.t, b.t from t03 a, t04 b where a.id=b.id2 and a.id2=5;
  8. QUERY PLAN
  9. ---------------------------------------------------------------------------------------
  10. Distributed Query
  11. Executor: Router
  12. Task Count: 1
  13. Tasks Shown: All
  14. -> Task
  15. Node: host=localhost port=9703 dbname=postgres
  16. -> Hash Join (cost=180.01..372.52 rows=1 width=12)
  17. Hash Cond: (b.id2 = a.id)
  18. -> Seq Scan on t04_102237 b (cost=0.00..155.00 rows=10000 width=10)
  19. -> Hash (cost=180.00..180.00 rows=1 width=10)
  20. -> Seq Scan on t03_102236 a (cost=0.00..180.00 rows=1 width=10)
  21. Filter: (id2 = 5)
  22. (12 rows)
  23. Time: 13.071 ms

两张广播表join没有问题。

试一试,分片表与广播表join一下:

  1. postgres=# select a.t, b.t from t01 a, t03 b where a.id=b.id2 and a.id2=5;
  2. t | t
  3. -------+-------
  4. 55555 | 55555
  5. (1 row)
  6. Time: 4.964 ms
  7. postgres=# explain select a.t, b.t from t01 a, t03 b where a.id=b.id2 and a.id2=5;
  8. QUERY PLAN
  9. -------------------------------------------------------------------------------------
  10. Distributed Query
  11. Executor: Router
  12. Task Count: 1
  13. Tasks Shown: All
  14. -> Task
  15. Node: host=localhost port=9702 dbname=postgres
  16. -> Hash Join (cost=5.81..198.32 rows=1 width=12)
  17. Hash Cond: (b.id2 = a.id)
  18. -> Seq Scan on t03_102236 b (cost=0.00..155.00 rows=10000 width=10)
  19. -> Hash (cost=5.80..5.80 rows=1 width=10)
  20. -> Seq Scan on t01_102178 a (cost=0.00..5.80 rows=1 width=10)
  21. Filter: (id2 = 5)
  22. (12 rows)
  23. Time: 15.099 ms

分片表与广播表join没有问题。

试一下分片表的聚合函数:

  1. postgres=# select id, avg(id2) from t01 group by id limit 5;
  2. id | avg
  3. ------+-----------------------
  4. 2848 | 2848.0000000000000000
  5. 251 | 251.0000000000000000
  6. 3565 | 3565.0000000000000000
  7. 2026 | 2026.0000000000000000
  8. 6158 | 6158.0000000000000000
  9. (5 rows)
  10. Time: 1108.943 ms
  11. postgres=# explain select id, avg(id2) from t01 group by id limit 5;
  12. QUERY PLAN
  13. -----------------------------------------------------------------------------------
  14. Distributed Query into pg_merge_job_0056
  15. Executor: Task-Tracker
  16. Task Count: 32
  17. Tasks Shown: One of 32
  18. -> Task
  19. Node: host=localhost port=9702 dbname=postgres
  20. -> HashAggregate (cost=7.55..10.72 rows=317 width=20)
  21. Group Key: id
  22. -> Seq Scan on t01_102172 t01 (cost=0.00..5.17 rows=317 width=8)
  23. Master Query
  24. -> Limit (cost=0.00..0.00 rows=0 width=0)
  25. -> HashAggregate (cost=0.00..0.00 rows=0 width=0)
  26. Group Key: intermediate_column_56_0
  27. -> Seq Scan on pg_merge_job_0056 (cost=0.00..0.00 rows=0 width=0)
  28. (14 rows)
  29. Time: 17.838 ms

也是没有问题。

好的,先试到这儿,祝大家愉快。

1 评论  
lxcos · 1L · 2018-10-22 13:52:50

不错

添加一条新评论