当前位置:首页 > 数据库 > 正文内容

mysql数据导入es

root2年前 (2021-11-11)数据库458

将mysql数据取出放到elasticsearch中


from datetime import datetime
from elasticsearch import Elasticsearch
import pymysql
import time
import json
from elasticsearch.helpers import bulk, streaming_bulk
import sys

from multiprocessing import Pool
tb_name = "test"
# es的索引
es_index = "index_" + tb_name
es_type = "type_" + tb_name

def db2es(x):
    i,j=x
    db = pymysql.connect("127.0.0.1", "root", "123456", "User", charset='utf8')
    es = Elasticsearch([{"host": "127.0.0.1", "port": 9200}], timeout=60, max_retries=3, retry_on_timeout=True)
    cursor = db.cursor()
    while True:
        print(time.strftime('%Y-%m-%d %H:%M:%S --> ') + str(i))
        sql = '''SELECT
                            id,
                            name,
                            age
                        FROM
                            ''' + tb_name + ''' limit %s, 100000;''' % i

        cursor.execute(sql)
        rows = cursor.fetchall()
        action = []
        if rows:
            for row in rows:
                (id, name, age) = row

                action.append({
                    "_index": es_index,
                    "_type": es_type,
                    "_id": id,
                    "_source": {
                        "name":name,
                        "age":age
                    }
                })
            # 导入es
            bulk(es, action)
            del action[0:len(action)]
            i = i + 100000
            if i >= j:
                print(i)
                break
        else:
            break
    print(time.strftime('%Y-%m-%d %H:%M:%S --> '), i, j)
    print(tb_name + " done")
    db.close()

if __name__ == '__main__':
	# 多进程运行
    pool = Pool(processes=8)
    db = pymysql.connect("127.0.0.1", "root", "123456", "User", charset='utf8')
    cursor = db.cursor()
    sql = "SELECT COUNT(*) FROM test;"
    cursor.execute(sql)
    rows = cursor.fetchone()[0]
    db.close()
    args=[(i, i+5000000 if rows>(i+5000000)else rows) for i in range(0,rows,5000000)]
    pool.map(db2es,args)
    pool.close()
    pool.join()


扫描二维码推送至手机访问。

版权声明:本文由一叶知秋发布,如需转载请注明出处。

本文链接:https://zhiqiu.top/?id=173

分享给朋友:

相关文章

curl 访问es 常用的命令

查询有哪些索引curl -X GET 'http://10.0.0.143:8200/_cat/indices'查询索引的别名curl  -XGET 'http://127.0.0.1:8200/index...

dataX:超强的数据库数据互导工具

安装下载安装包http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz 然后解压进入解压后文件的./bin自检命令python  datax.py ../j...

mysql alter、index的操作

删除字段alter table user drop name;添加字段alter table user add address varchar(255...

mysql 重置表

truncate table 表名...

clickhouse 分区、分片简单理解

clickhouse 分区、分片简单理解

分区是表的分区,具体的DDL操作关键词是 PARTITION BY,指的是一个表按照某一列数据(比如日期)进行分区,对应到最终的结果就是不同分区的数据会写入不同的文件中。分片复用了数据库的分区,相当于在原有的分区下,作为第二层分区...

Mysql占用swap空间,导致mysql性能大幅降低

在Linux下,SWAP的作用类似Windows系统下的“虚拟内存”。当物理内存不足时,拿出部分硬盘空间当SWAP分区(虚拟成内存)使用,从而解决内存容量不足的情况。SWAP意思是交换,顾名思义,当某进程向OS请求内存发现不足时,OS会把内...