当前位置:首页 > 数据库 > 正文内容

mysql数据导入es

root2年前 (2021-11-11)数据库586

将mysql数据取出放到elasticsearch中


from datetime import datetime
from elasticsearch import Elasticsearch
import pymysql
import time
import json
from elasticsearch.helpers import bulk, streaming_bulk
import sys

from multiprocessing import Pool
tb_name = "test"
# es的索引
es_index = "index_" + tb_name
es_type = "type_" + tb_name

def db2es(x):
    i,j=x
    db = pymysql.connect("127.0.0.1", "root", "123456", "User", charset='utf8')
    es = Elasticsearch([{"host": "127.0.0.1", "port": 9200}], timeout=60, max_retries=3, retry_on_timeout=True)
    cursor = db.cursor()
    while True:
        print(time.strftime('%Y-%m-%d %H:%M:%S --> ') + str(i))
        sql = '''SELECT
                            id,
                            name,
                            age
                        FROM
                            ''' + tb_name + ''' limit %s, 100000;''' % i

        cursor.execute(sql)
        rows = cursor.fetchall()
        action = []
        if rows:
            for row in rows:
                (id, name, age) = row

                action.append({
                    "_index": es_index,
                    "_type": es_type,
                    "_id": id,
                    "_source": {
                        "name":name,
                        "age":age
                    }
                })
            # 导入es
            bulk(es, action)
            del action[0:len(action)]
            i = i + 100000
            if i >= j:
                print(i)
                break
        else:
            break
    print(time.strftime('%Y-%m-%d %H:%M:%S --> '), i, j)
    print(tb_name + " done")
    db.close()

if __name__ == '__main__':
	# 多进程运行
    pool = Pool(processes=8)
    db = pymysql.connect("127.0.0.1", "root", "123456", "User", charset='utf8')
    cursor = db.cursor()
    sql = "SELECT COUNT(*) FROM test;"
    cursor.execute(sql)
    rows = cursor.fetchone()[0]
    db.close()
    args=[(i, i+5000000 if rows>(i+5000000)else rows) for i in range(0,rows,5000000)]
    pool.map(db2es,args)
    pool.close()
    pool.join()


扫描二维码推送至手机访问。

版权声明:本文由一叶知秋发布,如需转载请注明出处。

本文链接:https://zhiqiu.top/?id=173

分享给朋友:

相关文章

dataX:超强的数据库数据互导工具

安装下载安装包http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz 然后解压进入解压后文件的./bin自检命令python  datax.py ../j...

clickhouse 搭建

通过docker 部署clickhousedocker-compose文件内容如下:services:     ipwave-clickhouse:     ...

被Navicat坑哭的日常,版本问题

mysql5.7DROP TABLE IF EXISTS `xxx_copy1`;CREATE TABLE `xxx_copy1`  (  `id` int(11) UNSIGNED NOT NULL AUTO_INCR...

postgresql 导入导出sql 文件

pg_dump  -h localhost -U postgres -t tablename databasename >./test.sql导出 -t 表名  psql -d test1 -U...

centos7 快速搭建 mysql

https://blog.csdn.net/qq_36582604/article/details/80526287...

postgresql修改数据存储位置

postgresql修改数据存储位置

最近公司提供了一台新的服务器,同时有一个盘是ssd。同时为了测试postgresql的性能,将数据放置到ssd上系统centos7.9 数据库postgresql-9.6首先停掉数据库systemctl stop pos...