盒子
盒子

python进行Elasticsearch批量索引

本文分享使用python脚本进行Elasticsearch的批量索引

之前主要还是采用jdbc-river的方式将数据从MySQL索引到Elasticsearch,但是由于有些数据表字符各种奇怪的,导入总是报各种错,于是自己折腾个python脚本导入数据,这里列出主要函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#获取表数据量
def getTablesCounts(_cursor, _tablename):
_querySQL = 'select count(*) from `%s`'%_tablename
retNum = _cursor.execute(_querySQL)
return _cursor.fetchone();

#从数据库中获取一条数据
def getFromId(_cursor, _id,_tablename):
_querySQL = "select * from `%s` where id = %s "%(_tablename, _id)
retNum = _cursor.execute(_querySQL)
return _cursor.fetchone();

#将数据导入ES,bulk数据的大小为500000
def bulkData(_cur, _tableCounts):
es = Elasticsearch("192.168.88.88")
j = 0
count = _tableCounts
actions = []
while (j < count):
oneRec = getFromId(_cur, j+1, tablename)
action = {
"_index": indexname,
"_type": typename,
"_id": j + 1,
"_source": {
"column1":oneRec[1],
"column2":oneRec[2],
"column3":oneRec[3],
"column4":oneRec[4],
"column5":oneRec[5],
"column6":oneRec[6]
}
}
actions.append(action)
j += 1
if (len(actions) == 500000):
print j
helpers.bulk(es, actions)
del actions[0:len(actions)]
if (len(actions) > 0):
helpers.bulk(es, actions)
del actions[0:len(actions)]

【参考文章】