mirror of
https://github.com/projecthorus/sondehub-infra.git
synced 2024-12-24 07:16:42 +00:00
fix up bulk APIs
This commit is contained in:
parent
037b274054
commit
655bceb9df
@ -31,6 +31,7 @@ def request(payload, path, method, params=None):
|
|||||||
r = es_session.send(request.prepare())
|
r = es_session.send(request.prepare())
|
||||||
|
|
||||||
if r.status_code != 200 and r.status_code != 201:
|
if r.status_code != 200 and r.status_code != 201:
|
||||||
|
print(zlib.decompress(r.content, 16 + zlib.MAX_WBITS))
|
||||||
raise RuntimeError
|
raise RuntimeError
|
||||||
|
|
||||||
if (
|
if (
|
||||||
|
@ -382,7 +382,7 @@ def bulk_upload_es(index_prefix,payloads):
|
|||||||
body += "{\"index\":{}}\n" + json.dumps(payload) + "\n"
|
body += "{\"index\":{}}\n" + json.dumps(payload) + "\n"
|
||||||
body += "\n"
|
body += "\n"
|
||||||
date_prefix = datetime.now().strftime("%Y-%m")
|
date_prefix = datetime.now().strftime("%Y-%m")
|
||||||
result = es.request(body, f"{index_prefix}-{date_prefix}/_doc/_bulk", "POST")
|
result = es.request(body, f"{index_prefix}-{date_prefix}/_bulk", "POST")
|
||||||
|
|
||||||
if 'errors' in result and result['errors'] == True:
|
if 'errors' in result and result['errors'] == True:
|
||||||
error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types
|
error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types
|
||||||
|
@ -28,7 +28,7 @@ def lambda_handler(event, context):
|
|||||||
body += "{\"index\":{}}\n" + json.dumps(payload) + "\n"
|
body += "{\"index\":{}}\n" + json.dumps(payload) + "\n"
|
||||||
body += "\n"
|
body += "\n"
|
||||||
|
|
||||||
result = es.request(body, f"ham-telm-{index}/_doc/_bulk", "POST")
|
result = es.request(body, f"ham-telm-{index}/_bulk", "POST")
|
||||||
if 'errors' in result and result['errors'] == True:
|
if 'errors' in result and result['errors'] == True:
|
||||||
error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types
|
error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types
|
||||||
print(event)
|
print(event)
|
||||||
|
@ -416,7 +416,7 @@ def bulk_upload_es(index_prefix,payloads):
|
|||||||
body += "{\"index\":{}}\n" + json.dumps(payload) + "\n"
|
body += "{\"index\":{}}\n" + json.dumps(payload) + "\n"
|
||||||
body += "\n"
|
body += "\n"
|
||||||
date_prefix = datetime.now().strftime("%Y-%m")
|
date_prefix = datetime.now().strftime("%Y-%m")
|
||||||
result = es.request(body, f"{index_prefix}-{date_prefix}/_doc/_bulk", "POST")
|
result = es.request(body, f"{index_prefix}-{date_prefix}/_bulk", "POST")
|
||||||
|
|
||||||
if 'errors' in result and result['errors'] == True:
|
if 'errors' in result and result['errors'] == True:
|
||||||
error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types
|
error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types
|
||||||
|
@ -25,7 +25,7 @@ def lambda_handler(event, context):
|
|||||||
body += "{\"index\":{}}\n" + json.dumps(payload) + "\n"
|
body += "{\"index\":{}}\n" + json.dumps(payload) + "\n"
|
||||||
body += "\n"
|
body += "\n"
|
||||||
|
|
||||||
result = es.request(body, f"telm-{index}/_doc/_bulk", "POST")
|
result = es.request(body, f"telm-{index}/_bulk", "POST")
|
||||||
if 'errors' in result and result['errors'] == True:
|
if 'errors' in result and result['errors'] == True:
|
||||||
error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types
|
error_types = [x['index']['error']['type'] for x in result['items'] if 'error' in x['index']] # get all the error types
|
||||||
print(event)
|
print(event)
|
||||||
|
Loading…
Reference in New Issue
Block a user