postgres数据同步到elasticsearch

  • Post author:
  • Post category:其他


公司的题库系统数据量较大,通常用户会对题目题干进行模糊查询,postgres的like查询效率低下,考虑采用elasticsearch优化查询方案。


参考

https://www.jianshu.com/p/629f698a7c58

环境如下:
  • mac
  • postgres 10.2
  • python 2.7.13
  • es 6.3.1
运行步骤,虽然按照简书上的一步一步来,但是还是遇到了坑,再次详细记录一下。
1. 官网下载 ElasticSearch ,解压缩,以下命令运行:
bin/elasticsearch
2. 安装 PostgreSQL 插件 multicorn
git clone https://github.com/Kozea/Multicorn /tmp/multicorn
cd $_

由于 Multicorn 的master 代码在 OS X 环境有问题,有几个步骤要手工执行:

(1)修改文件 Makefile 的93行,将 darwin 的首字母改成大写:Darwin

(2)执行 make

(3)sudo ARCHFLAGS=”-arch x86_64” make install

(4)执行 make install

3. 连接postgres,选择数据库,并创建一个 EXTENSION
create extension multicorn ;
注意,你的用户必须为superuser,否则创建失败。
4. 安装 PostgreSQL 插件 pg-es-fdw
git clone https://github.com/Mikulas/pg-es-fdw /tmp/pg-es-fdw
cd $_
sudo python setup.py install
5. 基于 multicorn 创建 foreign server
CREATE SERVER multicorn_es FOREIGN DATA WRAPPER multicorn
OPTIONS (
  wrapper 'dite.ElasticsearchFDW'
);
注意,可能提示python ImportError,退出postgres,重连
6. 创建表,我是已存在的表,表大致如下
CREATE TABLE t_question (
    q_id serial PRIMARY KEY,
    q_title text NOT NULL
);
7. 创建外部表
CREATE FOREIGN TABLE t_question_es (
    id bigint,
    title text
) SERVER multicorn_es OPTIONS (host '127.0.0.1', port '9200', node 'test', index 'question');
此处有两个坑
1). 当你访问

http://localhost:9200/test/question/

{id},他会提示你

index_not_found

,使用postman,put形式访问

http://localhost:9200/test

创建索引。
2). 不要将t_question_es的id改为q_id(我习惯性改为了q_id),导致后面数据同步时报没有id列的错误。
8. 创建触发器

对实体表,创建触发器函数,在用户对实体表插入,删除,更新时,通过触发器函数自动将数据同步到对应ES的外部表。同步过程调用 FDW 的接口,对 ES 进行索引的建立,更新,删除:

CREATE OR REPLACE FUNCTION index_question() RETURNS trigger AS $def$
    BEGIN
        INSERT INTO t_question_es (id, title) VALUES
            (NEW.q_id, NEW.q_title);
        RETURN NEW;
    END;
$def$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION reindex_question() RETURNS trigger AS $def$
    BEGIN
        UPDATE t_question_es SET
            title = NEW.q_title
        WHERE id = NEW.q_id;
        RETURN NEW;
    END;
$def$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION delete_question() RETURNS trigger AS $def$
    BEGIN
        DELETE FROM t_question_es a WHERE a.id = OLD.q_id;
        RETURN OLD;
    END;
$def$ LANGUAGE plpgsql;

CREATE TRIGGER es_insert_question
    AFTER INSERT ON t_question
    FOR EACH ROW EXECUTE PROCEDURE index_question();

CREATE TRIGGER es_update_question
    AFTER UPDATE OF q_title ON t_question
    FOR EACH ROW
    WHEN (OLD.* IS DISTINCT FROM NEW.*)
    EXECUTE PROCEDURE reindex_question();

CREATE TRIGGER es_delete_question
    BEFORE DELETE ON t_question
    FOR EACH ROW EXECUTE PROCEDURE delete_question();
9. 测试插入数据
insert into t_question_es(id, title) values (1, 'aaaaa');
10. 查询
http://localhost:9200/test/question/1
11. 删除
delete from t_question_es where id=1;
12. 查询
http://localhost:9200/test/question/1
13. 同步正式数据
insert into t_question_es(id, title) select
 q_id, q_title from t_question;
14. 测试搜索
http://localhost:9200/test/question/_search?q=*:*&pretty
注意,如果你同时测试了test索引下面创建了两个表,那么,你在插入的时候会有问题,删除索引,重新创建索引并导入数据。



版权声明:本文为qq_31343581原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。