admin管理员组

文章数量:1794759

ElasticSearch的慢查询的自动化kill

对于MySQL的慢查询而言,为了保证数据库不被某些垃圾sql搞死,一般会可以使用pt-kill或者自己写脚本,去show full processlist 然后根据一定的判断逻辑(select超过多少秒)去找到对应的connect_id,然后执行kill会话的操作。

对于ES而言,也有类似的需求。这里简单说下方法:

1、找出当前的活跃会话

2、判断活跃会话是否符合kill的条件

代码如下:

代码语言:python代码运行次数:0运行复制
import time
import datetime
import requests

def cancel_request(url, request_id):
    response = requests.post(f"{url}/_tasks/{request_id}/_cancel")
    if response.status_code == 200:
        print(f"Request {request_id} cancelled successfully.")
    else:
        print(f"Failed to cancel request {request_id}.")


def get_active_requests(url):
    response = requests.get(f"{url}/_tasks?detailed=true&actions=*search")
    if response.status_code == 200:
        return response.json()
    else:
        print("Failed to fetch active requests.")
        return None


def main(url,threshold):
    print(f"---- check es slow query at {datetime.datetime.now()} -----")
    active_requests = get_active_requests(url)

    for i in active_requests['nodes'].values():
        for ii in i:
            if ii == 'tasks':
                tasks = i['tasks']
                for iii in tasks:
                    tasks_detail = tasks[iii]
                    # print(tasks_detail)

                    run_seconds = tasks_detail['running_time_in_nanos'] / 1000000000
                    action = tasks_detail['action']
                    start_time =  str(datetime.datetime.fromtimestamp(tasks_detail['start_time_in_millis']/1000))
                    description = tasks_detail['description']

                    request_id = f"{tasks_detail['node']}:{tasks_detail['id']}"
                    
                    # 这里还可以把消息通过IM发出来
                    # msg_content= "## ELK慢查询kill通知\n\n" + "- 查询时间: " + str(start_time) + "\n\n- 已运行秒数: " + str(run_seconds) +  "\n\n- 查询语句: " +str(description)

                    # 只关注:运行时间超过threshold秒并且是查询的请求
                    if run_seconds >= int(threshold) and action == 'indices:data/read/search':
                        print(f"--- 发现慢查询,将执行cancel操作 ---")
                        cancel_request(url, request_id)
                        # print(msg_content)
                        

if __name__ == '__main__':
    url =  "http://192.168.31.181:9200"
    while True:
        main(url,5)  # 这里设置比较低的阈值,便于演示
        time.sleep(1)

在Kibana DevTool里面搞一个慢查询出来

代码语言:json复制
# 随便找个稍微大点的索引即可
GET /.monitoring-es-7-2024.10.15/_search
{"size":500000,
  "query": {
    "match_all": {}
  },
  "timeout": "10s"
}

然后运行上面的python脚本,

python elk_slow_query_kill.py 稍等几秒钟就可以看到效果。(我下图中还用了flock锁,便于在crontab里面使用)

kill掉后,kibana DevTool里如下:

代码语言:json复制
{
  "error": {
    "root_cause": [
      {
        "type": "task_cancelled_exception",
        "reason": "cancelled"
      }
    ],
    "type": "search_phase_execution_exception",
    "reason": "all shards failed",
    "phase": "query",
    "grouped": true,
    "failed_shards": [
      {
        "shard": 0,
        "index": ".monitoring-es-7-2024.10.15",
        "node": "MLCJQzgmRzOHpHBIgDqD4Q",
        "reason": {
          "type": "task_cancelled_exception",
          "reason": "cancelled"
        }
      }
    ]
  },
  "status": 400
}

本文标签: ElasticSearch的慢查询的自动化kill