mysql数据导入es
将mysql数据取出放到elasticsearch中
from datetime import datetime from elasticsearch import Elasticsearch import pymysql import time import json from elasticsearch.helpers import bulk, streaming_bulk import sys from multiprocessing import Pool tb_name = "test" # es的索引 es_index = "index_" + tb_name es_type = "type_" + tb_name def db2es(x): i,j=x db = pymysql.connect("127.0.0.1", "root", "123456", "User", charset='utf8') es = Elasticsearch([{"host": "127.0.0.1", "port": 9200}], timeout=60, max_retries=3, retry_on_timeout=True) cursor = db.cursor() while True: print(time.strftime('%Y-%m-%d %H:%M:%S --> ') + str(i)) sql = '''SELECT id, name, age FROM ''' + tb_name + ''' limit %s, 100000;''' % i cursor.execute(sql) rows = cursor.fetchall() action = [] if rows: for row in rows: (id, name, age) = row action.append({ "_index": es_index, "_type": es_type, "_id": id, "_source": { "name":name, "age":age } }) # 导入es bulk(es, action) del action[0:len(action)] i = i + 100000 if i >= j: print(i) break else: break print(time.strftime('%Y-%m-%d %H:%M:%S --> '), i, j) print(tb_name + " done") db.close() if __name__ == '__main__': # 多进程运行 pool = Pool(processes=8) db = pymysql.connect("127.0.0.1", "root", "123456", "User", charset='utf8') cursor = db.cursor() sql = "SELECT COUNT(*) FROM test;" cursor.execute(sql) rows = cursor.fetchone()[0] db.close() args=[(i, i+5000000 if rows>(i+5000000)else rows) for i in range(0,rows,5000000)] pool.map(db2es,args) pool.close() pool.join()