Elasticsearch对并发冲突的整理

  • Post author:
  • Post category:其他


Elasticsearch对并发冲突的解决



背景

一同事写多线程执行批量修改时发现,A线程对index中doc字段state 1改为2,B线程对index中doc字段type 2改为3。执行之后查询结果发现部分doc的state字段仍为1或者type仍为2,这个时候才想起了ES

不支持事物

,然后就对这一块进行一个总结。


运行环境 7.12.1



官方对于并发冲突的说明


官方全部更新说明连接处



官方部分更新说明连接处



其中对里面的描述


对Elasticsearch的doc执行修改动作,但实际上 Elasticsearch 按前述完全相同方式执行以下过程:

  • 从旧文档构建 JSON
  • 更改该 JSON
  • 删除旧文档
  • 索引一个新文档

    从描述上就能很容易看到es对事物的支持不好,单个的更新也仅仅是为了节省网络资源,操作并不是

    原子性

    的。



    官方对于并发冲突的解决办法(乐观并发控制)


    为什么选择乐观并发控制(乐观锁)


    https://www.elastic.co/guide/cn/elasticsearch/guide/current/version-control.html


    具体的控制方法(_version 或者联合version_type=external)


    https://www.elastic.co/guide/cn/elasticsearch/guide/current/optimistic-concurrency-control.html

    Elasticsearch 是分布式的。当文档创建、更新或删除时, 新版本的文档必须复制到集群中的其他节点。Elasticsearch 也是异步和并发的,这意味着这些复制请求被并行发送,并且到达目的地时也许 顺序是乱的 。 Elasticsearch 需要一种方法确保文档的旧版本不会覆盖新的版本。

 root@UCSS-SK135-sps:/# curl  -u use:passwd http://169.254.253.5:9200/_cat/indices?v  //查询索引
 root@UCSS-SK135-sps:/# curl  -u use:passwd http://169.254.253.5:9200/swg-20210816-01/_search?pretty //根据索引找到里面的doc,然后找到_id
 root@UCSS-SK135-sps:/# curl  -u use:passwd http://169.254.253.5:9200/swg-20210816-01/_doc/99b33625-e32e-4d82-b056-419396672c0d?pretty   //根据_id 找到doc,并找到里面的_version
{
  "_index" : "swg-20210816-01",
  "_type" : "_doc",
  "_id" : "99b33625-e32e-4d82-b056-419396672c0d",
  "_version" : 1,
  "_seq_no" : 3,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "serialId" : 0,
    "sourceIp" : "172.18.0.101",
    "detectDateTime" : "2021-08-16T06:11:06.294+0000",
    "recordDateTime" : "2021-08-16T06:11:17.422+0000",
    "detectTime" : "06:11:06",
    "detectType" : 0,
    "actionType" : 1,
    "channelType" : 1,
    "fullUrl" : "http://172.16.0.1/post.php",
    "urlHostname" : "172.16.0.1",
    "destinationUrl" : "http://172.16.0.1",
    "destinationIp" : "172.16.0.1",
    "port" : 80,
    "method" : "GET",
    "statusCode" : 200,
    "contentType" : "text/html; charset=UTF-8",
    "categoryTypes" : [
      1291
    ],
    "riskType" : 0,
    "severityType" : 4,
    "isSecure" : true,
    "statusType" : 0,
    "browseTime" : 20,
    "hits" : 1,
    "bytesUpload" : 0,
    "bytesDownload" : 0,
    "bytesSent" : 129,
    "bytesReceived" : 2581,
    "deviceId" : "2429ab75-91b4-3c43-3b08-5f395a0b3133",
    "deviceName" : "SWG-SK135",
    "isTruncated" : true,
    "isConsolidated" : false,
    "bodyBytes" : 1159,
    "source" : {
      "displayName" : "172.18.0.101",
      "ip" : "172.18.0.101"
    },
    "policyUuid" : [
      "aa915076-d5c2-4656-a2fe-48b56a4b1d8e"
    ],
    "policyName" : [
      "默认策略"
    ],
    "threatType" : [
      0
    ],
    "attachments" : [
      {
        "filename" : "post.php",
        "fileType" : 1,
        "fileTypeName" : "不可识别的文件类型",
        "fileSize" : 0
      }
    ],
    "hierarchy" : [
      0
    ],
    "httpReqHdrBytes" : 129,
    "httpReqBodyBytes" : 0,
    "httpRspHdrBytes" : 263,
    "httpRspBodyBytes" : 1159,
    "blockType" : 0,
    "ars" : 0.0,
    "mrs" : 0.0,
    "ers" : 0.0,
    "trs" : 0.0,
    "cloudAppCategory" : 0,
    "cloudAppTrustValue" : 0.0,
    "cloudAppId" : 0,
    "sessionStage" : [
      1,
      2
    ],
    "userAgent" : "curl/7.26.0",
    "riskLevel" : 0.0,
    "isMobile" : false,
    "networkType" : 0,
    "queryType" : 2,
    "quotaActionType" : 0,
    "uuid" : "99b33625-e32e-4d82-b056-419396672c0d"
  }
}

在修改某条数据的时候先查找到_version,在更新的时候传入该字段

   root@UCSS-SK135-sps:/# curl -X POST -H "Content-Type: application/json"  -u use:passwd http://169.254.253.5:9200/swg-20210816-01/_update/99b33625-e32e-4d82-b056-419396672c0d?version=2 -d '{"doc":{"ars":22}}'  

   
   {
	"error": {
		"root_cause": [{
			"type": "action_request_validation_exception",
			"reason": "Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use `if_seq_no` and `if_primary_term` instead;"
		}],
		"type": "action_request_validation_exception",
		"reason": "Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use `if_seq_no` and `if_primary_term` instead;"
	}
	


此刻出现错误???提示废弃让用新的,然后就去找最新的文档查看



https://www.elastic.co/guide/en/elasticsearch/reference/current/optimistic-concurrency-control.html



https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html


没说不让用,严格按照它说的操作只是说了

if_seq_no和 if_primary_term

参数用来并发操作的时候使用

	 curl  -u use:passwd -H "Content-Type: application/json"  -X PUT http://169.254.253.5:9200/_cluster/settings -d '{"persistent": {"action.auto_create_index": "mytest"}}' 
	 curl  -u use:passwd -H "Content-Type: application/json"  -X PUT http://169.254.253.5:9200/mytest/_doc/1 -d '{"message":"mytest"}' 
	 curl  -u use:passwd  http://169.254.253.5:9200/mytest/_doc/1?pretty
	 curl  -u use:passwd  -H "Content-Type: application/json"  -X PUT  -d '{"message":"22"}' http://169.254.253.5:9200/mytest/_doc/1?version=4&version_type=external 

以旧提示让用新的方法。

root@UCSS-SK135-sps:/# curl  -u use:passwd   -H "Content-Type: application/json"  -X PUT  -d '{"message":"22"}' "http://169.254.253.5:9200/mytest/_doc/1?if_seq_no=5&if_primary_term=1&pretty"
{
  "_index" : "mytest",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 7,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 6,
  "_primary_term" : 1
}

用新的方法尝试可以更新,感觉官网不靠谱,废弃或者替代应该声明,如果发现可以用请留言。



已经找到对应官方说明(不支持version)


https://www.elastic.co/guide/en/elasticsearch/reference/7.14/release-notes-7.0.0-beta1.html

Remove support for internal versioning for concurrency control #38254

(issue: #1078)

在release的备注里面进行了说明,但是没有在文档中移除。



上面介绍的只能处理单个DOC的并发问题,对于批量的解决办法

[https://www.elastic.co/guide/cn/elasticsearch/guide/current/denormalization-concurrency.html](https://www.elastic.co/guide/cn/elasticsearch/guide/current/concurrency-solutions.html)

https://www.elastic.co/guide/cn/elasticsearch/guide/current/concurrency-solutions.html

官方提供了三种方法

以下是三个切实可行的使用 Elasticsearch 的解决方案,它们都涉及某种形式的锁:

  • 全局锁 global可以随意更改是一个锁的名称
  • 文档锁
  • 树锁


全局锁


在最新的文档中未发现,但是依旧可以使用

root@UCSS-SK135-sps:/# curl  -u use:passwd  -H "Content-Type: application/json"  -X PUT  -d '{"message":"22"}' http://169.254.253.5:9200/mytest/_doc/2/_create

{"_index":"mytest","_type":"_doc","_id":"2","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":7,"_primary_term":1}
root@UCSS-SK135-sps:/# curl  -u use:passwd   -H "Content-Type: application/json"  -X PUT  -d '{"message":"22"}' http://169.254.253.5:9200/mytest/_doc/2/_create

{"error":{"root_cause":[{"type":"version_conflict_engine_exception","reason":"[2]: version conflict, document already exists (current version [1])","index_uuid":"gvVR71CqSHuO8hxT-Af1-g","shard":"0","index":"mytest"}],"type":"version_conflict_engine_exception","reason":"[2]: version conflict, document already exists (current version [1])","index_uuid":"gvVR71CqSHuO8hxT-Af1-g","shard":"0","index":"mytest"},"status":409}

http://169.254.253.5:9200/mytest/_doc/2/_create

http://169.254.253.5:9200/{indexName}/_doc/{约定锁的ID}/_create

文档锁还有树锁应该也可以使用,不再尝试

大家可以参考的文档


https://www.cnblogs.com/huangying2124/p/12806508.html



https://blog.csdn.net/u011262847/article/details/78118142


可以用elasticsearch的特性实现排他和共享锁



版权声明:本文为qq_18746961原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。