当前位置:首页 > 数据库 > 正文内容

mysql数据导入es

root3年前 (2021-11-11)数据库910

将mysql数据取出放到elasticsearch中


from datetime import datetime
from elasticsearch import Elasticsearch
import pymysql
import time
import json
from elasticsearch.helpers import bulk, streaming_bulk
import sys

from multiprocessing import Pool
tb_name = "test"
# es的索引
es_index = "index_" + tb_name
es_type = "type_" + tb_name

def db2es(x):
    i,j=x
    db = pymysql.connect("127.0.0.1", "root", "123456", "User", charset='utf8')
    es = Elasticsearch([{"host": "127.0.0.1", "port": 9200}], timeout=60, max_retries=3, retry_on_timeout=True)
    cursor = db.cursor()
    while True:
        print(time.strftime('%Y-%m-%d %H:%M:%S --> ') + str(i))
        sql = '''SELECT
                            id,
                            name,
                            age
                        FROM
                            ''' + tb_name + ''' limit %s, 100000;''' % i

        cursor.execute(sql)
        rows = cursor.fetchall()
        action = []
        if rows:
            for row in rows:
                (id, name, age) = row

                action.append({
                    "_index": es_index,
                    "_type": es_type,
                    "_id": id,
                    "_source": {
                        "name":name,
                        "age":age
                    }
                })
            # 导入es
            bulk(es, action)
            del action[0:len(action)]
            i = i + 100000
            if i >= j:
                print(i)
                break
        else:
            break
    print(time.strftime('%Y-%m-%d %H:%M:%S --> '), i, j)
    print(tb_name + " done")
    db.close()

if __name__ == '__main__':
	# 多进程运行
    pool = Pool(processes=8)
    db = pymysql.connect("127.0.0.1", "root", "123456", "User", charset='utf8')
    cursor = db.cursor()
    sql = "SELECT COUNT(*) FROM test;"
    cursor.execute(sql)
    rows = cursor.fetchone()[0]
    db.close()
    args=[(i, i+5000000 if rows>(i+5000000)else rows) for i in range(0,rows,5000000)]
    pool.map(db2es,args)
    pool.close()
    pool.join()


扫描二维码推送至手机访问。

版权声明:本文由一叶知秋发布,如需转载请注明出处。

本文链接:https://zhiqiu.top/?id=173

分享给朋友:

相关文章

被Navicat坑哭的日常,版本问题

mysql5.7DROP TABLE IF EXISTS `xxx_copy1`;CREATE TABLE `xxx_copy1`  (  `id` int(11) UNSIGNED NOT NULL AUTO_INCR...

ES 修改 查询最大行数

curl  -XPUT  http://ip:port/your_index/_settings?preserve_existing=true  -H  'Content-Type: &nbs...

清空postgresql的缓存

系统:centos,版本:postgresql-9.6因为要测试postgresql的性能,当多次查询的时候查询结果会因为缓存用时很短,不能模拟出现实使用的场景。因此需要清除缓存。首先stop掉postgresqlsystemctl sto...

clickhouse 分区、分片简单理解

clickhouse 分区、分片简单理解

分区是表的分区,具体的DDL操作关键词是 PARTITION BY,指的是一个表按照某一列数据(比如日期)进行分区,对应到最终的结果就是不同分区的数据会写入不同的文件中。分片复用了数据库的分区,相当于在原有的分区下,作为第二层分区...

mysql 临时表和复制表

创建临时表CREATE TEMPORARY TABLE SalesSummary  (product_name VARCHAR(50) NOT NULL, ...

mysql 重置表

truncate table 表名...