当前位置:首页 > 数据库 > 正文内容

mysql数据导入es

root3年前 (2021-11-11)数据库610

将mysql数据取出放到elasticsearch中


from datetime import datetime
from elasticsearch import Elasticsearch
import pymysql
import time
import json
from elasticsearch.helpers import bulk, streaming_bulk
import sys

from multiprocessing import Pool
tb_name = "test"
# es的索引
es_index = "index_" + tb_name
es_type = "type_" + tb_name

def db2es(x):
    i,j=x
    db = pymysql.connect("127.0.0.1", "root", "123456", "User", charset='utf8')
    es = Elasticsearch([{"host": "127.0.0.1", "port": 9200}], timeout=60, max_retries=3, retry_on_timeout=True)
    cursor = db.cursor()
    while True:
        print(time.strftime('%Y-%m-%d %H:%M:%S --> ') + str(i))
        sql = '''SELECT
                            id,
                            name,
                            age
                        FROM
                            ''' + tb_name + ''' limit %s, 100000;''' % i

        cursor.execute(sql)
        rows = cursor.fetchall()
        action = []
        if rows:
            for row in rows:
                (id, name, age) = row

                action.append({
                    "_index": es_index,
                    "_type": es_type,
                    "_id": id,
                    "_source": {
                        "name":name,
                        "age":age
                    }
                })
            # 导入es
            bulk(es, action)
            del action[0:len(action)]
            i = i + 100000
            if i >= j:
                print(i)
                break
        else:
            break
    print(time.strftime('%Y-%m-%d %H:%M:%S --> '), i, j)
    print(tb_name + " done")
    db.close()

if __name__ == '__main__':
	# 多进程运行
    pool = Pool(processes=8)
    db = pymysql.connect("127.0.0.1", "root", "123456", "User", charset='utf8')
    cursor = db.cursor()
    sql = "SELECT COUNT(*) FROM test;"
    cursor.execute(sql)
    rows = cursor.fetchone()[0]
    db.close()
    args=[(i, i+5000000 if rows>(i+5000000)else rows) for i in range(0,rows,5000000)]
    pool.map(db2es,args)
    pool.close()
    pool.join()


扫描二维码推送至手机访问。

版权声明:本文由一叶知秋发布,如需转载请注明出处。

本文链接:https://zhiqiu.top/?id=173

分享给朋友:

相关文章

mysql 重置表

truncate table 表名...

清除mysql 查询缓存

对一条sql进行优化时,发现原本很慢的一条sql(将近1分钟) 在第二次运行时, 瞬间就完成了(0.00sec) 这是因为mysql对同一条sql进行了缓存,服务器直接从上次的查询结果缓存中读取数据,而不是重新分析...

clickhouse 搭建

通过docker 部署clickhousedocker-compose文件内容如下:services:     ipwave-clickhouse:     ...

clickhouse 分区、分片简单理解

clickhouse 分区、分片简单理解

分区是表的分区,具体的DDL操作关键词是 PARTITION BY,指的是一个表按照某一列数据(比如日期)进行分区,对应到最终的结果就是不同分区的数据会写入不同的文件中。分片复用了数据库的分区,相当于在原有的分区下,作为第二层分区...

postgresql 查看数据库、表的大小

查看数据库的大小 select pg_database_size('test'); select pg_size_pretty(pg_database_size('test');查看单...

mysql like 模糊查询和REGEXP 正则查询

like%:表示任意个或多个字符。可匹配任意类型和长度的字符。_:表示任意单个字符。匹配单个任意字符,它常用来限制表达式的字符长度语句:(可以代表一个中文字符)匹配”三”字结尾select * from use...