Start logging rethinkdb open/close as there appears to be stuck connections
This commit is contained in:
parent
f2ad84e0c5
commit
5c90d3e7fa
|
@ -2,7 +2,9 @@ import ruamel.yaml as yaml
|
||||||
import asyncio
|
import asyncio
|
||||||
import rethinkdb as r
|
import rethinkdb as r
|
||||||
import pendulum
|
import pendulum
|
||||||
|
import logging
|
||||||
|
|
||||||
|
log = logging.getLogger()
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
global_config = {}
|
global_config = {}
|
||||||
|
|
||||||
|
@ -129,30 +131,35 @@ def command_prefix(bot, message):
|
||||||
async def add_content(table, content):
|
async def add_content(table, content):
|
||||||
r.set_loop_type("asyncio")
|
r.set_loop_type("asyncio")
|
||||||
conn = await r.connect(**db_opts)
|
conn = await r.connect(**db_opts)
|
||||||
|
log.info("RethinkDB Instance opened. Table: {}---Content: {}---Method = add_content".format(table, content))
|
||||||
# First we need to make sure that this entry doesn't exist
|
# First we need to make sure that this entry doesn't exist
|
||||||
# For all rethinkDB cares, multiple entries can exist with the same content
|
# For all rethinkDB cares, multiple entries can exist with the same content
|
||||||
# For our purposes however, we do not want this
|
# For our purposes however, we do not want this
|
||||||
try:
|
try:
|
||||||
result = await r.table(table).insert(content).run(conn)
|
result = await r.table(table).insert(content).run(conn)
|
||||||
await conn.close()
|
|
||||||
except r.ReqlOpFailedError:
|
except r.ReqlOpFailedError:
|
||||||
# This means the table does not exist
|
# This means the table does not exist
|
||||||
await r.table_create(table).run(conn)
|
await r.table_create(table).run(conn)
|
||||||
await r.table(table).insert(content).run(conn)
|
await r.table(table).insert(content).run(conn)
|
||||||
await conn.close()
|
|
||||||
result = {}
|
result = {}
|
||||||
|
|
||||||
|
await conn.close()
|
||||||
|
log.info("RethinkDB Instance closed. Table: {}---Content: {}---Method = add_content".format(table, content))
|
||||||
return result.get('inserted', 0) > 0
|
return result.get('inserted', 0) > 0
|
||||||
|
|
||||||
|
|
||||||
async def remove_content(table, key):
|
async def remove_content(table, key):
|
||||||
r.set_loop_type("asyncio")
|
r.set_loop_type("asyncio")
|
||||||
conn = await r.connect(**db_opts)
|
conn = await r.connect(**db_opts)
|
||||||
|
log.info("RethinkDB Instance opened. Table: {}---Key: {}---Method: remove_content".format(table, key))
|
||||||
try:
|
try:
|
||||||
result = await r.table(table).get(key).delete().run(conn)
|
result = await r.table(table).get(key).delete().run(conn)
|
||||||
except r.ReqlOpFailedError:
|
except r.ReqlOpFailedError:
|
||||||
result = {}
|
result = {}
|
||||||
pass
|
pass
|
||||||
|
|
||||||
await conn.close()
|
await conn.close()
|
||||||
|
log.info("RethinkDB Instance closed. Table: {}---Key: {}---Method: remove_content".format(table, key))
|
||||||
if table == 'prefixes' or table == 'server_settings':
|
if table == 'prefixes' or table == 'server_settings':
|
||||||
loop.create_task(cache[table].update())
|
loop.create_task(cache[table].update())
|
||||||
return result.get('deleted', 0) > 0
|
return result.get('deleted', 0) > 0
|
||||||
|
@ -161,6 +168,7 @@ async def remove_content(table, key):
|
||||||
async def update_content(table, content, key):
|
async def update_content(table, content, key):
|
||||||
r.set_loop_type("asyncio")
|
r.set_loop_type("asyncio")
|
||||||
conn = await r.connect(**db_opts)
|
conn = await r.connect(**db_opts)
|
||||||
|
log.info("RethinkDB Instance opened. Table: {}---Content: {}---Key: {}---Method: update_content".format(table, content, key))
|
||||||
# This method is only for updating content, so if we find that it doesn't exist, just return false
|
# This method is only for updating content, so if we find that it doesn't exist, just return false
|
||||||
try:
|
try:
|
||||||
# Update based on the content and filter passed to us
|
# Update based on the content and filter passed to us
|
||||||
|
@ -168,9 +176,10 @@ async def update_content(table, content, key):
|
||||||
# This is why we're accepting a variable and using it, whatever it may be, as the query
|
# This is why we're accepting a variable and using it, whatever it may be, as the query
|
||||||
result = await r.table(table).get(key).update(content).run(conn)
|
result = await r.table(table).get(key).update(content).run(conn)
|
||||||
except r.ReqlOpFailedError:
|
except r.ReqlOpFailedError:
|
||||||
await conn.close()
|
|
||||||
result = {}
|
result = {}
|
||||||
|
|
||||||
await conn.close()
|
await conn.close()
|
||||||
|
log.info("RethinkDB Instance closed. Table: {}---Content: {}---Key: {}---Method: update_content".format(table, content, key))
|
||||||
if table == 'prefixes' or table == 'server_settings':
|
if table == 'prefixes' or table == 'server_settings':
|
||||||
loop.create_task(cache[table].update())
|
loop.create_task(cache[table].update())
|
||||||
return result.get('replaced', 0) > 0 or result.get('unchanged', 0) > 0
|
return result.get('replaced', 0) > 0 or result.get('unchanged', 0) > 0
|
||||||
|
@ -180,12 +189,14 @@ async def replace_content(table, content, key):
|
||||||
# This method is here because .replace and .update can have some different functionalities
|
# This method is here because .replace and .update can have some different functionalities
|
||||||
r.set_loop_type("asyncio")
|
r.set_loop_type("asyncio")
|
||||||
conn = await r.connect(**db_opts)
|
conn = await r.connect(**db_opts)
|
||||||
|
log.info("RethinkDB Instance opened. Table: {}---Content: {}---Key: {}---Method: replace_content".format(table, content, key))
|
||||||
try:
|
try:
|
||||||
result = await r.table(table).get(key).replace(content).run(conn)
|
result = await r.table(table).get(key).replace(content).run(conn)
|
||||||
except r.ReqlOpFailedError:
|
except r.ReqlOpFailedError:
|
||||||
await conn.close()
|
|
||||||
result = {}
|
result = {}
|
||||||
|
|
||||||
await conn.close()
|
await conn.close()
|
||||||
|
log.info("RethinkDB Instance closed. Table: {}---Content: {}---Key: {}---Method: replace_content".format(table, content, key))
|
||||||
if table == 'prefixes' or table == 'server_settings':
|
if table == 'prefixes' or table == 'server_settings':
|
||||||
loop.create_task(cache[table].update())
|
loop.create_task(cache[table].update())
|
||||||
return result.get('replaced', 0) > 0 or result.get('unchanged', 0) > 0
|
return result.get('replaced', 0) > 0 or result.get('unchanged', 0) > 0
|
||||||
|
@ -194,6 +205,7 @@ async def replace_content(table, content, key):
|
||||||
async def get_content(table, key=None):
|
async def get_content(table, key=None):
|
||||||
r.set_loop_type("asyncio")
|
r.set_loop_type("asyncio")
|
||||||
conn = await r.connect(**db_opts)
|
conn = await r.connect(**db_opts)
|
||||||
|
log.info("RethinkDB Instance opened. Table: {}---Key: {}---Method: get_content".format(table, key))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if key:
|
if key:
|
||||||
|
@ -210,7 +222,9 @@ async def get_content(table, key=None):
|
||||||
content = cursor
|
content = cursor
|
||||||
except (IndexError, r.ReqlOpFailedError):
|
except (IndexError, r.ReqlOpFailedError):
|
||||||
content = None
|
content = None
|
||||||
|
|
||||||
await conn.close()
|
await conn.close()
|
||||||
|
log.info("RethinkDB Instance closed. Table: {}---Key: {}---Method: get_content".format(table, key))
|
||||||
if table == 'prefixes' or table == 'server_settings':
|
if table == 'prefixes' or table == 'server_settings':
|
||||||
loop.create_task(cache[table].update())
|
loop.create_task(cache[table].update())
|
||||||
return content
|
return content
|
||||||
|
@ -218,6 +232,7 @@ async def get_content(table, key=None):
|
||||||
async def filter_content(table: str, r_filter):
|
async def filter_content(table: str, r_filter):
|
||||||
r.set_loop_type("asyncio")
|
r.set_loop_type("asyncio")
|
||||||
conn = await r.connect(**db_opts)
|
conn = await r.connect(**db_opts)
|
||||||
|
log.info("RethinkDB Instance opened. Table: {}---Filter: {}---Method: filter_content".format(table, r_filter))
|
||||||
try:
|
try:
|
||||||
cursor = await r.table(table).filter(r_filter).run(conn)
|
cursor = await r.table(table).filter(r_filter).run(conn)
|
||||||
content = await _convert_to_list(cursor)
|
content = await _convert_to_list(cursor)
|
||||||
|
@ -225,7 +240,9 @@ async def filter_content(table: str, r_filter):
|
||||||
content = None
|
content = None
|
||||||
except (IndexError, r.ReqlOpFailedError):
|
except (IndexError, r.ReqlOpFailedError):
|
||||||
content = None
|
content = None
|
||||||
|
|
||||||
await conn.close()
|
await conn.close()
|
||||||
|
log.info("RethinkDB Instance closed. Table: {}---Filter: {}---Method: filter_content".format(table, r_filter))
|
||||||
if table == 'prefixes' or table == 'server_settings':
|
if table == 'prefixes' or table == 'server_settings':
|
||||||
loop.create_task(cache[table].update())
|
loop.create_task(cache[table].update())
|
||||||
return content
|
return content
|
||||||
|
|
Loading…
Reference in a new issue