PostgreSQL 数据同步到ES 搭建操作
(编辑:jimmy 日期: 2025/1/16 浏览:3 次 )
安装python 和dev 开发包
[root@rtm2 Packages]# rpm -ivh python-devel-2.7.5-58.el7.x86_64.rpm 准备中... ################################# [100%] 正在升级/安装... 1:python-devel-2.7.5-58.el7 ################################# [100%] [root@rtm2 Packages]# ls
安装 multicorn
[root@rtm2 multicorn-1.3.5]# make Python version is 2.7 gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/errors.o src/errors.c gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/python.o src/python.c gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/query.o src/query.c gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/multicorn.o src/multicorn.c gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -shared -o multicorn.so src/errors.o src/python.o src/query.o src/multicorn.o -L/opt/pgsql-10/lib -Wl,--as-needed -Wl,-rpath,'/opt/pgsql-10/lib',--enable-new-dtags -lpthread -ldl -lutil -lm -lpython2.7 -lpthread -ldl -lutil -lm -lpython2.7 -Xlinker -export-dynamic .//preflight-check.sh cp sql/multicorn.sql sql/multicorn--1.3.5.sql [root@rtm2 multicorn-1.3.5]# make install Python version is 2.7 ...
安装pg-es-fdw-master
[root@rtm2 multicorn-1.3.5]# cd ../pg-es-fdw-master [root@rtm2 pg-es-fdw-master]# ls demo.sh dite LICENSE README.md setup.py [root@rtm2 pg-es-fdw-master]# python setup.py build running build running build_py creating build creating build/lib creating build/lib/dite copying dite/__init__.py -> build/lib/dite [root@rtm2 pg-es-fdw-master]# python setup.py install running install running bdist_egg running egg_info creating dite.egg-info writing dite.egg-info/PKG-INFO
安装插件 multicorn
[postgres@rtm2 ~]$ psql psql (10.3) Type "help" for help. postgres=# select * from pg_extension; extname | extowner | extnamespace | extrelocatable | extversion | extconfig | extcondition ---------+----------+--------------+----------------+------------+-----------+-------------- plpgsql | 10 | 11 | f | 1.0 | | (1 row) postgres=# CREATE EXTENSION multicorn; CREATE EXTENSION postgres=# psql postgres=# select * from pg_extension; extname | extowner | extnamespace | extrelocatable | extversion | extconfig | extcondition -----------+----------+--------------+----------------+------------+-----------+-------------- plpgsql | 10 | 11 | f | 1.0 | | multicorn | 10 | 2200 | t | 1.3.5 | | (2 rows) postgres=# CREATE SERVER multicorn_es FOREIGN DATA WRAPPER multicorn OPTIONS(wrapper 'dite.ElasticsearchFDW'); CREATE SERVER postgres=#
es
[root@rtm2 config]# vi elasticsearch.yml node.name: "es-node1" network.host: 192.168.31.121 discovery.zen.ping.unicast.hosts: ["192.168.31.121"]
[root@rtm2 config]# vi /etc/sysctl.conf vm.max_map_count=262144 sysctl -p [root@rtm2 config]# vi /etc/security/limits.conf # End of file root soft nofile 65536 root hard nofile 65536 root soft nproc 4096 root hard nproc 4096 ~
启动es
[root@rtm2 bin]# ls elasticsearch elasticsearch.in.bat elasticsearch-service-mgr.exe elasticsearch-service-x86.exe plugin.bat elasticsearch.bat elasticsearch.in.sh elasticsearch-service-x64.exe plugin service.bat [root@rtm2 bin]# ./bin/elasticsearch
test=# CREATE FOREIGN TABLE pp_es (id bigint,age bigint) SERVER multicorn_es OPTIONS (host test(# '192.168.31.121', port '9200', node 'es-node1', index 'pp'); CREATE FOREIGN TABLE test=#
创建触发器和外部表
test=# CREATE OR REPLACE FUNCTION index_pp() RETURNS trigger AS $def$ test$# BEGIN test$# INSERT INTO pp_es (id, age) VALUES test$# (NEW.id, NEW.age); test$# RETURN NEW; test$# END; test$# $def$ LANGUAGE plpgsql; CREATE FUNCTION test=# CREATE TRIGGER es_insert_pp AFTER INSERT ON pp FOR EACH ROW EXECUTE PROCEDURE index_pp(); CREATE TRIGGER test=#
新增数据测试
test=# insert into pp (id,age) values (1,11); INSERT 0 1 test=# select * from pp; id | age ----+----- 1 | 11 (1 row) test=#
检查es数据
[root@rtm2 ~]# curl 'http://192.168.31.121:9200/es-node1/_search"took" : 104, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "failed" : 0 }, "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "es-node1", "_type" : "pp", "_id" : "1", "_score" : 1.0, "_source":{"age": "11"} }, { "_index" : "es-node1", "_type" : "pp", "_id" : "2", "_score" : 1.0, "_source":{"age": "22"} } ] } } [root@rtm2 ~]#
创建更新触发器
test=# CREATE OR REPLACE FUNCTION updadeIndex_pp() RETURNS trigger AS $def$ BEGIN UPDATE pp_es SET id = NEW.id, age = NEW.age where id =NEW.id; RETURN NEW; END; $def$ LANGUAGE plpgsql; CREATE FUNCTION test=# ^C test=# test=# CREATE TRIGGER es_update_pp AFTER UPDATE OF id, age ON pp FOR EACH ROW WHEN (OLD.* IS DISTINCT test(# FROM NEW.*)EXECUTE PROCEDURE updadeIndex_pp(); CREATE TRIGGER test=#
更新表数据
test=# select * from pp; id | age ----+----- 1 | 11 2 | 22 3 | 22 (3 rows) test=# update pp a set a.age = 33 where a.id = 3; ERROR: column "a" of relation "pp" does not exist LINE 1: update pp a set a.age = 33 where a.id = 3; ^ test=# update pp set age = 33 where id = 3; UPDATE 1 test=# select * from pp; id | age ----+----- 1 | 11 2 | 22 3 | 33 (3 rows) test=#
es查询变更
[root@rtm2 ~]# curl 'http://192.168.31.121:9200/es-node1/_search"took" : 4, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "failed" : 0 }, "hits" : { "total" : 3, "max_score" : 1.0, "hits" : [ { "_index" : "es-node1", "_type" : "pp", "_id" : "1", "_score" : 1.0, "_source":{"age": "11"} }, { "_index" : "es-node1", "_type" : "pp", "_id" : "2", "_score" : 1.0, "_source":{"age": "22"} }, { "_index" : "es-node1", "_type" : "pp", "_id" : "3", "_score" : 1.0, "_source":{"age": "33"} } ] } } [root@rtm2 ~]#
补充:logstash同步pgsql数据到Elasticsearch
一、对于logstash的配置我就不在多说,主要是三部分,input、filter、output的配置
二、配置步骤
1、input配置
input { stdin { } jdbc { jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/world" jdbc_user => "postgres" jdbc_password => "zhang123" jdbc_driver_library => "D:\logstash-6.4.0\bin\pgsql\postgresql-42.2.5.jar" jdbc_driver_class => "org.postgresql.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "300000" use_column_value => "true" tracking_column => "id" statement_filepath => "D:\logstash-6.4.0\bin\pgsql\jdbc.sql" schedule => "* * * * *" type => "jdbc" jdbc_default_timezone =>"Asia/Shanghai" } }
2、filter配置
filter { json { source => "message" remove_field => ["message"] } }
3、output 配置,就是elasticsearch的基本配置
output { elasticsearch { hosts => ["localhost:9200"] index => "test_out" template => "D:\logstash-6.4.0\bin\pgsql\es-template.json" template_name => "t-statistic-out-logstash" template_overwrite => true document_type => "out" document_id => "%{id}" } stdout { codec => json_lines } }
以上就是整个logstash 的jdbc.conf
4、es-template.json的配置
{ "template" : "t-statistis-out-template", "order":1, "settings": { "index": { "refresh_interval": "5s" } }, "mappings": { "_default_": { "_all" : {"enabled":false}, "dynamic_templates": [ { "message_field" : { "match" : "message", "match_mapping_type" : "string", "mapping" : { "type" : "string", "index" : "not_analyzed" } } }, { "string_fields" : { "match" : "*", "match_mapping_type" : "string", "mapping" : { "type" : "string", "index" : "not_analyzed" } } } ], "properties": { "@timestamp": { "type": "date" }, "@version": { "type": "keyword" }, "id": { "type": "keyword" }, "name": { "type": "keyword" }, "pp": { "type": "keyword" } } } }, "aliases": {} }
最后就是就是下载好pgsql的连接驱动,这个官网可以下载;配置好自己的数据库表格的数据
启动命令:进入到logstash的bin目录下,自己的logstash配置都是放在bin的pgsql这个目录下面(这个自己随意创建位置都可以)
logstash.bat -f ./pgsql/jdbc.conf
以上为个人经验,希望能给大家一个参考,也希望大家多多支持。如有错误或未考虑完全的地方,望不吝赐教。
下一篇:postgresql删除主键的操作