Misc. updates.

This commit is contained in:
arkerekon 2023-01-10 09:10:47 -05:00 committed by Reily Siegel
parent e813a5d72d
commit e3321000cb
Signed by untrusted user: 1114
GPG Key ID: 508A5AD0A50F88AF
19 changed files with 3375 additions and 3303 deletions

24
.gitignore vendored
View File

@ -1,13 +1,13 @@
apitoken.txt
killswitch.txt
sheets_credentials.json
sheets_token.json
*.bak
*.dat
*.dir
*.pyc
*.swp
*.log
script_log.txt
towels_rolled
apitoken.txt
killswitch.txt
sheets_credentials.json
sheets_token.json
*.bak
*.dat
*.dir
*.pyc
*.swp
*.log
script_log.txt
towels_rolled
user_scrolls

9
Dockerfile Normal file
View File

@ -0,0 +1,9 @@
FROM python:3.8-slim
WORKDIR /app
COPY . .
RUN pip3 install --user -r dependencies.txt
CMD python3 -u main.py

854
client.py
View File

@ -1,427 +1,427 @@
from __future__ import annotations
import asyncio
import json
import pprint
import sys
import traceback
import logging
from pprint import pformat
from typing import List, Any, AsyncGenerator, Dict, Coroutine, TypeVar
from typing import Optional
from aiohttp import web
from slackclient import SlackClient
import hooks
import settings
import slack_util
"""
Objects to wrap slack connections
"""
# Read the API token
api_file = open("apitoken.txt", 'r')
SLACK_API = next(api_file).strip()
api_file.close()
class ClientWrapper(object):
"""
Essentially the main state object.
We only ever expect one of these per api token.
Holds a slack client, and handles messsages.
"""
def __init__(self, api_token):
# Init slack
self.slack = SlackClient(api_token)
# Hooks go regex -> callback on (slack, msg, match)
self.hooks: List[hooks.AbsHook] = []
# Periodicals are just wrappers around an iterable, basically
self.passives: List[hooks.Passive] = []
# Cache users and channels
self.users: Dict[str, slack_util.User] = {}
self.conversations: Dict[str, slack_util.Conversation] = {}
# Scheduled/passive events handling
def add_passive(self, per: hooks.Passive) -> None:
self.passives.append(per)
async def run_passives(self) -> None:
"""
Run all currently added passives
"""
awaitables = [p.run() for p in self.passives]
await asyncio.gather(*awaitables)
# Incoming slack hook handling
def add_hook(self, hook: hooks.AbsHook) -> None:
self.hooks.append(hook)
async def handle_events(self) -> None:
"""
Asynchronous tasks that eternally reads and responds to messages.
"""
# Create a queue
queue = asyncio.Queue()
# Create a task to put rtm events to the queue
rtm_task = asyncio.create_task(self.rtm_event_feed(queue))
# Create a task to put http events to the queue
http_task = asyncio.create_task(self.http_event_feed(queue))
# Create a task to handle all other tasks
async def handle_task_loop():
async for t3 in self.spool_tasks(queue):
sys.stdout.flush()
if settings.SINGLE_THREAD_TASKS:
await t3
# Handle them all
await asyncio.gather(rtm_task, http_task, handle_task_loop())
async def rtm_event_feed(self, msg_queue: asyncio.Queue) -> None:
"""
Async wrapper around the message feed.
Yields messages awaitably forever.
"""
# Create the msg feed
feed = slack_util.message_stream(self.slack)
# Create a simple callable that gets one message from the feed
def get_one():
return next(feed)
# Continuously yield async threaded tasks that poll the feed
while True:
next_event = await asyncio.get_running_loop().run_in_executor(None, get_one)
await msg_queue.put(next_event)
async def http_event_feed(self, event_queue: asyncio.Queue) -> None:
# Create a callback to convert requests to events
async def interr(request: web.Request):
if request.can_read_body:
# Get the payload
post_params = await request.post()
payload = json.loads(post_params["payload"])
logging.info("\nInteraction Event received:")
logging.debug(pformat(payload))
# Handle each action separately
if "actions" in payload:
for action in payload["actions"]:
# Start building the event
ev = slack_util.Event()
# Get the user who clicked the button
ev.user = slack_util.UserContext(payload["user"]["id"])
# Get the message that they clicked
ev.message = slack_util.RelatedMessageContext(payload["message"]["ts"], payload["message"]["text"])
# Get the channel it was clicked in
ev.conversation = slack_util.ConversationContext(payload["channel"]["id"])
# Get the message this button/action was attached to
ev.interaction = slack_util.InteractionContext(payload["response_url"],
payload["trigger_id"],
action["block_id"],
action["action_id"],
action.get("value"))
# Put it in the queue
await event_queue.put(ev)
# Respond that everything is fine
return web.Response(status=200)
else:
logging.error("\nMalformed event received.")
# If we can't read it, get mad
return web.Response(status=400)
# Create the server
app = web.Application()
app.add_routes([web.post('/bothttpcallback', interr)])
# Asynchronously serve that boy up
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, port=31019)
await site.start()
logging.info("Server up")
# while True:
# await asyncio.sleep(30)
async def spool_tasks(self, event_queue: asyncio.Queue) -> AsyncGenerator[asyncio.Task, Any]:
"""
Read in from async event feed, and spool them out as async tasks
"""
while True:
event: slack_util.Event = await event_queue.get()
# Find which hook, if any, satisfies
for hook in list(self.hooks): # Note that we do list(self.hooks) to avoid edit-while-iterating issues
# Try invoking each
try:
# Try to make a coroutine handling the message
coro = hook.try_apply(event)
# If we get a coro back, then task it up and set consumption appropriately
if coro is not None:
logging.debug("Spawned task. Now {} running total.".format(len(asyncio.all_tasks())))
yield asyncio.create_task(_exception_printing_task(coro))
if hook.consumes:
break
except hooks.HookDeath:
# If a hook wants to die, let it.
self.hooks.remove(hook)
# Data getting/sending
def get_conversation(self, conversation_id: str) -> Optional[slack_util.Conversation]:
return self.conversations.get(conversation_id)
def get_conversation_by_name(self, conversation_identifier: str) -> Optional[slack_util.Conversation]:
# If looking for a direct message, first lookup user, then fetch
if conversation_identifier[0] == "@":
# user_name = conversation_identifier
# Find the user by their name
raise NotImplementedError("There wasn't a clear use case for this yet, so we've opted to just not use it")
# If looking for a channel, just lookup normally
elif conversation_identifier[0] == "#":
channel_name = conversation_identifier
# Find the channel in the dict
for channel in self.conversations.values():
if channel.name == channel_name:
return channel
# If it doesn't fit the above, we don't know how to process
else:
raise ValueError("Please give either an #channel-name or @user-name")
# If we haven't returned already, give up and return None
return None
def get_user(self, user_id: str) -> Optional[slack_util.User]:
return self.users.get(user_id)
def get_user_by_name(self, user_name: str) -> Optional[slack_util.User]:
raise NotImplementedError()
def api_call(self, api_method, **kwargs):
return self.slack.api_call(api_method, **kwargs)
# Simpler wrappers around message sending/replying
def reply(self, event: slack_util.Event, text: str, in_thread: bool = True) -> dict:
"""
Replies to a message.
Message must have a channel and message context.
Returns the JSON response.
"""
# Ensure we're actually replying to a valid message
assert (event.conversation and event.message) is not None
# Send in a thread by default
if in_thread:
# Figure otu what thread to send it to
thread = event.message.ts
if event.thread:
thread = event.thread.thread_ts
return self.send_message(text, event.conversation.conversation_id, thread=thread)
else:
return self.send_message(text, event.conversation.conversation_id)
def _send_core(self, api_method: str, text: str, channel_id: str, thread: str, broadcast: bool,
blocks: Optional[List[dict]]) -> dict:
"""
Copy of the internal send message function of slack, with some helpful options.
Returns the JSON response.
"""
# Check that text exists if there are no blocks
if blocks is None and text is None:
raise ValueError("Must provide blocks or texts or both.")
elif text is None:
text = "_Block message. Open slack client to view_"
# Begin constructing kwargs with fields that _must_ exist
kwargs = {"channel": channel_id, "text": text, "as_user": True, "parse": True}
# Deduce thread stuff
if thread:
kwargs["thread_ts"] = thread
if broadcast:
kwargs["reply_broadcast"] = True
elif broadcast:
raise ValueError("Can't broadcast a non-threaded message. Try again.")
# Set blocks iff provided.
if blocks is not None:
kwargs["blocks"] = blocks
result = self.api_call(api_method, **kwargs)
logging.info("Tried to send message \"{}\". Got response:\n {}".format(text, pprint.pformat(result)))
return result
def send_message(self,
text: Optional[str],
channel_id: str,
thread: str = None,
broadcast: bool = False,
blocks: Optional[List[dict]] = None) -> dict:
"""
Wraps _send_core for normal messages
"""
return self._send_core("chat.postMessage", text, channel_id, thread, broadcast, blocks)
def send_ephemeral(self,
text: Optional[str],
channel_id: str,
thread: str = None,
blocks: Optional[List[dict]] = None) -> dict:
"""
Wraps _send_core for ephemeral messages
"""
return self._send_core("chat.postEphemeral", text, channel_id, thread, False, blocks)
def edit_message(self, text: Optional[str], channel_id: str, message_ts: str, blocks: Optional[List[dict]] = None):
"""
Edits a message.
"""
# Check that text exists if there are no blocks
if blocks is None and text is None:
raise ValueError("Must provide blocks or texts or both.")
elif text is None:
text = "_Block message. Open slack client to view_"
# Set the default args
kwargs = {"ts": message_ts, "channel": channel_id, "text": text, "as_user": True}
# Set blocks if provided
if blocks is not None:
kwargs["blocks"] = blocks
return self.api_call("chat.update", **kwargs)
# Update slack data
def update_channels(self):
"""
Queries the slack API for all current channels
"""
# Necessary because of pagination
cursor = None
# Make a new dict to use
new_dict = {}
# Iterate over results
while True:
# Set args depending on if a cursor exists
args = {"limit": 1000, "types": "public_channel,private_channel,mpim,im"}
if cursor:
args["cursor"] = cursor
channel_dicts = self.api_call("conversations.list", **args)
# If the response is good, put its results to the dict
if channel_dicts["ok"]:
for channel_dict in channel_dicts["channels"]:
if channel_dict["is_im"]:
new_channel = slack_util.DirectMessage(id=channel_dict["id"],
user_id="@" + channel_dict["user"])
else:
new_channel = slack_util.Channel(id=channel_dict["id"],
name="#" + channel_dict["name"])
new_dict[new_channel.id] = new_channel
# Fetch the cursor
cursor = channel_dicts.get("response_metadata").get("next_cursor")
# If cursor is blank, we're done new channels, just give it up
if cursor == "":
break
else:
logging.warning("Failed to retrieve channels. Message: {}".format(channel_dicts))
break
self.conversations = new_dict
def update_users(self):
"""
Queries the slack API for all current users
"""
# Necessary because of pagination
cursor = None
while True:
# Set args depending on if a cursor exists
args = {"limit": 1000}
if cursor:
args["cursor"] = cursor
user_dicts = self.api_call("users.list", **args)
# Make a new dict to use
new_dict = {}
# If the response is good:
if user_dicts["ok"]:
for user_dict in user_dicts["members"]:
new_user = slack_util.User(id=user_dict.get("id"),
name=user_dict.get("name"),
real_name=user_dict.get("real_name"),
email=user_dict.get("profile").get("email"))
new_dict[new_user.id] = new_user
# Fetch the cursor
cursor = user_dicts.get("response_metadata").get("next_cursor")
# If cursor is blank, we're done new channels, just give it up
if cursor == "":
break
else:
logging.warning("Warning: failed to retrieve users")
break
self.users = new_dict
# Create a single instance of the client wrapper
_singleton = ClientWrapper(SLACK_API)
def get_slack() -> ClientWrapper:
return _singleton
"""
Miscellania
"""
A = TypeVar("A")
B = TypeVar("B")
C = TypeVar("C")
# Prints exceptions instead of silently dropping them in async tasks
async def _exception_printing_task(c: Coroutine[A, B, C]) -> Coroutine[A, B, C]:
# Print exceptions as they pass through
try:
return await c
except Exception:
output = traceback.format_exc()
print(output)
get_slack().send_message(output, "#botzone")
raise
from __future__ import annotations
import asyncio
import json
import pprint
import sys
import traceback
import logging
from pprint import pformat
from typing import List, Any, AsyncGenerator, Dict, Coroutine, TypeVar
from typing import Optional
from aiohttp import web
from slackclient import SlackClient
import hooks
import settings
import slack_util
"""
Objects to wrap slack connections
"""
# Read the API token
api_file = open("apitoken.txt", 'r')
SLACK_API = next(api_file).strip()
api_file.close()
class ClientWrapper(object):
"""
Essentially the main state object.
We only ever expect one of these per api token.
Holds a slack client, and handles messsages.
"""
def __init__(self, api_token):
# Init slack
self.slack = SlackClient(api_token)
# Hooks go regex -> callback on (slack, msg, match)
self.hooks: List[hooks.AbsHook] = []
# Periodicals are just wrappers around an iterable, basically
self.passives: List[hooks.Passive] = []
# Cache users and channels
self.users: Dict[str, slack_util.User] = {}
self.conversations: Dict[str, slack_util.Conversation] = {}
# Scheduled/passive events handling
def add_passive(self, per: hooks.Passive) -> None:
self.passives.append(per)
async def run_passives(self) -> None:
"""
Run all currently added passives
"""
awaitables = [p.run() for p in self.passives]
await asyncio.gather(*awaitables)
# Incoming slack hook handling
def add_hook(self, hook: hooks.AbsHook) -> None:
self.hooks.append(hook)
async def handle_events(self) -> None:
"""
Asynchronous tasks that eternally reads and responds to messages.
"""
# Create a queue
queue = asyncio.Queue()
# Create a task to put rtm events to the queue
rtm_task = asyncio.create_task(self.rtm_event_feed(queue))
# Create a task to put http events to the queue
http_task = asyncio.create_task(self.http_event_feed(queue))
# Create a task to handle all other tasks
async def handle_task_loop():
async for t3 in self.spool_tasks(queue):
sys.stdout.flush()
if settings.SINGLE_THREAD_TASKS:
await t3
# Handle them all
await asyncio.gather(rtm_task, http_task, handle_task_loop())
async def rtm_event_feed(self, msg_queue: asyncio.Queue) -> None:
"""
Async wrapper around the message feed.
Yields messages awaitably forever.
"""
# Create the msg feed
feed = slack_util.message_stream(self.slack)
# Create a simple callable that gets one message from the feed
def get_one():
return next(feed)
# Continuously yield async threaded tasks that poll the feed
while True:
next_event = await asyncio.get_running_loop().run_in_executor(None, get_one)
await msg_queue.put(next_event)
async def http_event_feed(self, event_queue: asyncio.Queue) -> None:
# Create a callback to convert requests to events
async def interr(request: web.Request):
if request.can_read_body:
# Get the payload
post_params = await request.post()
payload = json.loads(post_params["payload"])
logging.info("\nInteraction Event received:")
logging.debug(pformat(payload))
# Handle each action separately
if "actions" in payload:
for action in payload["actions"]:
# Start building the event
ev = slack_util.Event()
# Get the user who clicked the button
ev.user = slack_util.UserContext(payload["user"]["id"])
# Get the message that they clicked
ev.message = slack_util.RelatedMessageContext(payload["message"]["ts"], payload["message"]["text"])
# Get the channel it was clicked in
ev.conversation = slack_util.ConversationContext(payload["channel"]["id"])
# Get the message this button/action was attached to
ev.interaction = slack_util.InteractionContext(payload["response_url"],
payload["trigger_id"],
action["block_id"],
action["action_id"],
action.get("value"))
# Put it in the queue
await event_queue.put(ev)
# Respond that everything is fine
return web.Response(status=200)
else:
logging.error("\nMalformed event received.")
# If we can't read it, get mad
return web.Response(status=400)
# Create the server
app = web.Application()
app.add_routes([web.post('/bothttpcallback', interr)])
# Asynchronously serve that boy up
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, port=31019)
await site.start()
logging.info("Server up")
# while True:
# await asyncio.sleep(30)
async def spool_tasks(self, event_queue: asyncio.Queue) -> AsyncGenerator[asyncio.Task, Any]:
"""
Read in from async event feed, and spool them out as async tasks
"""
while True:
event: slack_util.Event = await event_queue.get()
# Find which hook, if any, satisfies
for hook in list(self.hooks): # Note that we do list(self.hooks) to avoid edit-while-iterating issues
# Try invoking each
try:
# Try to make a coroutine handling the message
coro = hook.try_apply(event)
# If we get a coro back, then task it up and set consumption appropriately
if coro is not None:
logging.debug("Spawned task. Now {} running total.".format(len(asyncio.all_tasks())))
yield asyncio.create_task(_exception_printing_task(coro))
if hook.consumes:
break
except hooks.HookDeath:
# If a hook wants to die, let it.
self.hooks.remove(hook)
# Data getting/sending
def get_conversation(self, conversation_id: str) -> Optional[slack_util.Conversation]:
return self.conversations.get(conversation_id)
def get_conversation_by_name(self, conversation_identifier: str) -> Optional[slack_util.Conversation]:
# If looking for a direct message, first lookup user, then fetch
if conversation_identifier[0] == "@":
# user_name = conversation_identifier
# Find the user by their name
raise NotImplementedError("There wasn't a clear use case for this yet, so we've opted to just not use it")
# If looking for a channel, just lookup normally
elif conversation_identifier[0] == "#":
channel_name = conversation_identifier
# Find the channel in the dict
for channel in self.conversations.values():
if channel.name == channel_name:
return channel
# If it doesn't fit the above, we don't know how to process
else:
raise ValueError("Please give either an #channel-name or @user-name")
# If we haven't returned already, give up and return None
return None
def get_user(self, user_id: str) -> Optional[slack_util.User]:
return self.users.get(user_id)
def get_user_by_name(self, user_name: str) -> Optional[slack_util.User]:
raise NotImplementedError()
def api_call(self, api_method, **kwargs):
return self.slack.api_call(api_method, **kwargs)
# Simpler wrappers around message sending/replying
def reply(self, event: slack_util.Event, text: str, in_thread: bool = True) -> dict:
"""
Replies to a message.
Message must have a channel and message context.
Returns the JSON response.
"""
# Ensure we're actually replying to a valid message
assert (event.conversation and event.message) is not None
# Send in a thread by default
if in_thread:
# Figure otu what thread to send it to
thread = event.message.ts
if event.thread:
thread = event.thread.thread_ts
return self.send_message(text, event.conversation.conversation_id, thread=thread)
else:
return self.send_message(text, event.conversation.conversation_id)
def _send_core(self, api_method: str, text: str, channel_id: str, thread: str, broadcast: bool,
blocks: Optional[List[dict]]) -> dict:
"""
Copy of the internal send message function of slack, with some helpful options.
Returns the JSON response.
"""
# Check that text exists if there are no blocks
if blocks is None and text is None:
raise ValueError("Must provide blocks or texts or both.")
elif text is None:
text = "_Block message. Open slack client to view_"
# Begin constructing kwargs with fields that _must_ exist
kwargs = {"channel": channel_id, "text": text, "as_user": True, "parse": True}
# Deduce thread stuff
if thread:
kwargs["thread_ts"] = thread
if broadcast:
kwargs["reply_broadcast"] = True
elif broadcast:
raise ValueError("Can't broadcast a non-threaded message. Try again.")
# Set blocks iff provided.
if blocks is not None:
kwargs["blocks"] = blocks
result = self.api_call(api_method, **kwargs)
logging.info("Tried to send message \"{}\". Got response:\n {}".format(text, pprint.pformat(result)))
return result
def send_message(self,
text: Optional[str],
channel_id: str,
thread: str = None,
broadcast: bool = False,
blocks: Optional[List[dict]] = None) -> dict:
"""
Wraps _send_core for normal messages
"""
return self._send_core("chat.postMessage", text, channel_id, thread, broadcast, blocks)
def send_ephemeral(self,
text: Optional[str],
channel_id: str,
thread: str = None,
blocks: Optional[List[dict]] = None) -> dict:
"""
Wraps _send_core for ephemeral messages
"""
return self._send_core("chat.postEphemeral", text, channel_id, thread, False, blocks)
def edit_message(self, text: Optional[str], channel_id: str, message_ts: str, blocks: Optional[List[dict]] = None):
"""
Edits a message.
"""
# Check that text exists if there are no blocks
if blocks is None and text is None:
raise ValueError("Must provide blocks or texts or both.")
elif text is None:
text = "_Block message. Open slack client to view_"
# Set the default args
kwargs = {"ts": message_ts, "channel": channel_id, "text": text, "as_user": True}
# Set blocks if provided
if blocks is not None:
kwargs["blocks"] = blocks
return self.api_call("chat.update", **kwargs)
# Update slack data
def update_channels(self):
"""
Queries the slack API for all current channels
"""
# Necessary because of pagination
cursor = None
# Make a new dict to use
new_dict = {}
# Iterate over results
while True:
# Set args depending on if a cursor exists
args = {"limit": 1000, "types": "public_channel,private_channel,mpim,im"}
if cursor:
args["cursor"] = cursor
channel_dicts = self.api_call("conversations.list", **args)
# If the response is good, put its results to the dict
if channel_dicts["ok"]:
for channel_dict in channel_dicts["channels"]:
if channel_dict["is_im"]:
new_channel = slack_util.DirectMessage(id=channel_dict["id"],
user_id="@" + channel_dict["user"])
else:
new_channel = slack_util.Channel(id=channel_dict["id"],
name="#" + channel_dict["name"])
new_dict[new_channel.id] = new_channel
# Fetch the cursor
cursor = channel_dicts.get("response_metadata").get("next_cursor")
# If cursor is blank, we're done new channels, just give it up
if cursor == "":
break
else:
logging.warning("Failed to retrieve channels. Message: {}".format(channel_dicts))
break
self.conversations = new_dict
def update_users(self):
"""
Queries the slack API for all current users
"""
# Necessary because of pagination
cursor = None
while True:
# Set args depending on if a cursor exists
args = {"limit": 1000}
if cursor:
args["cursor"] = cursor
user_dicts = self.api_call("users.list", **args)
# Make a new dict to use
new_dict = {}
# If the response is good:
if user_dicts["ok"]:
for user_dict in user_dicts["members"]:
new_user = slack_util.User(id=user_dict.get("id"),
name=user_dict.get("name"),
real_name=user_dict.get("real_name"),
email=user_dict.get("profile").get("email"))
new_dict[new_user.id] = new_user
# Fetch the cursor
cursor = user_dicts.get("response_metadata").get("next_cursor")
# If cursor is blank, we're done new channels, just give it up
if cursor == "":
break
else:
logging.warning("Warning: failed to retrieve users")
break
self.users = new_dict
# Create a single instance of the client wrapper
_singleton = ClientWrapper(SLACK_API)
def get_slack() -> ClientWrapper:
return _singleton
"""
Miscellania
"""
A = TypeVar("A")
B = TypeVar("B")
C = TypeVar("C")
# Prints exceptions instead of silently dropping them in async tasks
async def _exception_printing_task(c: Coroutine[A, B, C]) -> Coroutine[A, B, C]:
# Print exceptions as they pass through
try:
return await c
except Exception:
output = traceback.format_exc()
print(output)
get_slack().send_message(output, "#botzone")
raise

View File

@ -1,4 +1,7 @@
slackclient
python-Levenshtein
fuzzywuzzy
httplib2
slackclient==1.3.2
python-Levenshtein
fuzzywuzzy
httplib2
aiohttp
google-api-python-client
oauth2client

View File

@ -1,85 +1,85 @@
"""
Examples provided by google for using their api.
Very slightly modified by me to easily just get credentials
"""
from googleapiclient.discovery import build
from httplib2 import Http
from oauth2client import file, client, tools
# If modifying these scopes, delete your previously saved credentials
# at ~/.credentials/sheets.googleapis.com-python-quickstart.json
SCOPES = 'https://www.googleapis.com/auth/spreadsheets'
APPLICATION_NAME = 'SlickSlacker'
def _init_sheets_service():
store = file.Storage('sheets_token.json')
creds = store.get()
if not creds or creds.invalid:
flow = client.flow_from_clientsecrets('sheets_credentials.json', SCOPES)
creds = tools.run_flow(flow, store)
service = build('sheets', 'v4', http=creds.authorize(Http()))
return service
_global_sheet_service = _init_sheets_service()
# range should be of format 'SHEET NAME!A1:Z9'
def get_sheet_range(spreadsheet_id, sheet_range):
"""
Gets an array of the desired table
"""
result = _global_sheet_service.spreadsheets().values().get(spreadsheetId=spreadsheet_id,
range=sheet_range).execute()
values = result.get('values', [])
if not values:
return []
else:
return values
def set_sheet_range(spreadsheet_id, sheet_range, values):
"""
Set an array in the desired table
"""
body = {
"values": values
}
result = _global_sheet_service.spreadsheets().values().update(spreadsheetId=spreadsheet_id,
range=sheet_range,
valueInputOption="RAW",
body=body).execute()
return result
def get_calendar_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir, 'calendar-python-quickstart.json')
store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
"""
raise NotImplementedError("This isn't going to work")
"""
Examples provided by google for using their api.
Very slightly modified by me to easily just get credentials
"""
from googleapiclient.discovery import build
from httplib2 import Http
from oauth2client import file, client, tools
# If modifying these scopes, delete your previously saved credentials
# at ~/.credentials/sheets.googleapis.com-python-quickstart.json
SCOPES = 'https://www.googleapis.com/auth/spreadsheets'
APPLICATION_NAME = 'SlickSlacker'
def _init_sheets_service():
store = file.Storage('sheets_token.json')
creds = store.get()
if not creds or creds.invalid:
flow = client.flow_from_clientsecrets('sheets_credentials.json', SCOPES)
creds = tools.run_flow(flow, store)
service = build('sheets', 'v4', http=creds.authorize(Http()))
return service
_global_sheet_service = _init_sheets_service()
# range should be of format 'SHEET NAME!A1:Z9'
def get_sheet_range(spreadsheet_id, sheet_range):
"""
Gets an array of the desired table
"""
result = _global_sheet_service.spreadsheets().values().get(spreadsheetId=spreadsheet_id,
range=sheet_range).execute()
values = result.get('values', [])
if not values:
return []
else:
return values
def set_sheet_range(spreadsheet_id, sheet_range, values):
"""
Set an array in the desired table
"""
body = {
"values": values
}
result = _global_sheet_service.spreadsheets().values().update(spreadsheetId=spreadsheet_id,
range=sheet_range,
valueInputOption="RAW",
body=body).execute()
return result
def get_calendar_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir, 'calendar-python-quickstart.json')
store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
"""
raise NotImplementedError("This isn't going to work")

446
hooks.py
View File

@ -1,223 +1,223 @@
from __future__ import annotations
import logging
import re
from time import time
from typing import Match, Any, Coroutine, Callable, Optional, Union, List, TypeVar, Dict
import slack_util
# Return type of an event callback
MsgAction = Coroutine[Any, Any, None]
# Type signature of a message event callback function, for convenience
MsgCallback = Callable[[slack_util.Event, Match], MsgAction]
"""
Hooks
"""
# Signal exception to be raised when a hook has died
class HookDeath(Exception):
pass
# Abstract hook parent class
class AbsHook(object):
def __init__(self, consumes_applicable: bool):
# Whether or not messages that yield a coroutine should not be checked further
self.consumes = consumes_applicable
def try_apply(self, event: slack_util.Event) -> Optional[MsgAction]:
raise NotImplementedError()
class ChannelHook(AbsHook):
"""
Hook that handles messages in a variety of channels.
Guarantees prescence of Post, Related, Conversation, and User
"""
def __init__(self,
callback: MsgCallback,
patterns: Union[str, List[str]],
channel_whitelist: Optional[List[str]] = None,
channel_blacklist: Optional[List[str]] = None,
consumer: bool = True,
allow_dms: bool = True):
super(ChannelHook, self).__init__(consumer)
# Save all
if not isinstance(patterns, list):
patterns = [patterns]
self.patterns = patterns
self.channel_whitelist = channel_whitelist
self.channel_blacklist = channel_blacklist
self.callback = callback
self.allows_dms = allow_dms
# Remedy some sensible defaults
if self.channel_blacklist is None:
self.channel_blacklist = ["#general"]
elif self.channel_whitelist is None:
pass # We leave as none to show no whitelisting in effect
else:
raise ValueError("Cannot whitelist and blacklist")
def try_apply(self, event: slack_util.Event) -> Optional[MsgAction]:
"""
Returns whether a message should be handled by this dict, returning a Match if so, or None
"""
# Ensure that this is an event in a specific channel, with a text component
if not (event.conversation and event.was_post and event.message and event.user):
return None
# Fail if pattern invalid
match = None
for p in self.patterns:
match = re.match(p, event.message.text.strip(), flags=re.IGNORECASE)
if match is not None:
break
if match is None:
return None
# Get the channel name
if isinstance(event.conversation.get_conversation(), slack_util.Channel):
channel_name = event.conversation.get_conversation().name
elif self.allows_dms:
channel_name = "DIRECT_MSG"
else:
return None
# Fail if whitelist defined, and we aren't there
if self.channel_whitelist is not None and channel_name not in self.channel_whitelist:
return None
# Fail if blacklist defined, and we are there
if self.channel_blacklist is not None and channel_name in self.channel_blacklist:
return None
return self.callback(event, match)
class ReplyWaiter(AbsHook):
"""
A special hook that only cares about replies to a given message.
Guarantees presence of Post, Message, Thread, User, and Conversation.
As such, it ignores bots.
"""
def __init__(self, callback: MsgCallback, pattern: str, thread_ts: str, lifetime: float):
super().__init__(True)
self.callback = callback
self.pattern = pattern
self.thread_ts = thread_ts
self.lifetime = lifetime
self.start_time = time()
self.dead = False
def try_apply(self, event: slack_util.Event) -> Optional[MsgAction]:
# First check: are we dead of age yet?
time_alive = time() - self.start_time
should_expire = time_alive > self.lifetime
# If so, give up the ghost
if self.dead or should_expire:
raise HookDeath()
# Next make sure we're actually a message
if not (event.was_post and event.was_post and event.thread and event.conversation and event.user):
return None
# Otherwise proceed normally
# Is the msg the one we care about? If not, ignore
if event.thread.thread_ts != self.thread_ts:
return None
# Does it match the regex? if not, ignore
match = re.match(self.pattern, event.message.text.strip(), flags=re.IGNORECASE)
if match:
self.dead = True
return self.callback(event, match)
else:
return None
# The associated generic type of a button - value mapping
ActionVal = TypeVar("ActionVal")
class InteractionListener(AbsHook):
"""
Listens for replies on buttons.
Guarantees Interaction, Message, User, and Conversation
For fields that don't have a value of their own (such as buttons),
one can provide a mapping of action_ids to values.
In either case, the value is fed as a parameter to the callback.
The hook dies after successfully invoking callback.
"""
def __init__(self,
callback: Callable[[slack_util.Event, Union[ActionVal, str]], MsgAction],
action_bindings: Optional[Dict[str, ActionVal]], # The action_id -> value mapping. Value fed to callback
conversation: slack_util.Conversation, # Where the message is posted
message_ts: str, # Which message contains the block we care about
lifetime: float, # How long to keep listening
on_expire: Optional[Callable[[], None]] # Function to call on death by timeout. For instance, if you want to delete the message, or edit it to say "timed out"
):
super().__init__(True)
self.callback = callback
self.bindings = action_bindings
self.conversation = conversation
self.message_ts = message_ts
self.lifetime = lifetime
self.start_time = time()
self.on_expire = on_expire
self.dead = False
def try_apply(self, event: slack_util.Event) -> Optional[MsgAction]:
# First check: are we dead of age yet?
time_alive = time() - self.start_time
should_expire = time_alive > self.lifetime
# If so, give up the ghost
if self.dead or should_expire:
if self.on_expire and should_expire: # Call on_expire callback if we expired and it exists
self.on_expire()
raise HookDeath()
# Next make sure we've got an interaction
if not (event.interaction and event.message and event.user and event.conversation):
return None
# Otherwise proceed normally
# Is the msg the one we care about? If not, ignore
if event.message.ts != self.message_ts or event.conversation.get_conversation() != self.conversation:
return None
# Lookup the binding if we can/need to
value = event.interaction.action_value
if value is None and self.bindings is not None:
value = self.bindings.get(event.interaction.action_id)
# If the value is still none, we have an issue!
if value is None:
logging.error("Couldn't find an appropriate value for interaction {}".format(event.interaction))
return None
# Call the callback
self.dead = True
return self.callback(event, value)
class Passive(object):
"""
Base class for Periodical tasks, such as reminders and stuff
"""
async def run(self) -> None:
# Run this passive routed through the specified slack client.
raise NotImplementedError()
from __future__ import annotations
import logging
import re
from time import time
from typing import Match, Any, Coroutine, Callable, Optional, Union, List, TypeVar, Dict
import slack_util
# Return type of an event callback
MsgAction = Coroutine[Any, Any, None]
# Type signature of a message event callback function, for convenience
MsgCallback = Callable[[slack_util.Event, Match], MsgAction]
"""
Hooks
"""
# Signal exception to be raised when a hook has died
class HookDeath(Exception):
pass
# Abstract hook parent class
class AbsHook(object):
def __init__(self, consumes_applicable: bool):
# Whether or not messages that yield a coroutine should not be checked further
self.consumes = consumes_applicable
def try_apply(self, event: slack_util.Event) -> Optional[MsgAction]:
raise NotImplementedError()
class ChannelHook(AbsHook):
"""
Hook that handles messages in a variety of channels.
Guarantees prescence of Post, Related, Conversation, and User
"""
def __init__(self,
callback: MsgCallback,
patterns: Union[str, List[str]],
channel_whitelist: Optional[List[str]] = None,
channel_blacklist: Optional[List[str]] = None,
consumer: bool = True,
allow_dms: bool = True):
super(ChannelHook, self).__init__(consumer)
# Save all
if not isinstance(patterns, list):
patterns = [patterns]
self.patterns = patterns
self.channel_whitelist = channel_whitelist
self.channel_blacklist = channel_blacklist
self.callback = callback
self.allows_dms = allow_dms
# Remedy some sensible defaults
if self.channel_blacklist is None:
self.channel_blacklist = ["#general"]
elif self.channel_whitelist is None:
pass # We leave as none to show no whitelisting in effect
else:
raise ValueError("Cannot whitelist and blacklist")
def try_apply(self, event: slack_util.Event) -> Optional[MsgAction]:
"""
Returns whether a message should be handled by this dict, returning a Match if so, or None
"""
# Ensure that this is an event in a specific channel, with a text component
if not (event.conversation and event.was_post and event.message and event.user):
return None
# Fail if pattern invalid
match = None
for p in self.patterns:
match = re.match(p, event.message.text.strip(), flags=re.IGNORECASE)
if match is not None:
break
if match is None:
return None
# Get the channel name
if isinstance(event.conversation.get_conversation(), slack_util.Channel):
channel_name = event.conversation.get_conversation().name
elif self.allows_dms:
channel_name = "DIRECT_MSG"
else:
return None
# Fail if whitelist defined, and we aren't there
if self.channel_whitelist is not None and channel_name not in self.channel_whitelist:
return None
# Fail if blacklist defined, and we are there
if self.channel_blacklist is not None and channel_name in self.channel_blacklist:
return None
return self.callback(event, match)
class ReplyWaiter(AbsHook):
"""
A special hook that only cares about replies to a given message.
Guarantees presence of Post, Message, Thread, User, and Conversation.
As such, it ignores bots.
"""
def __init__(self, callback: MsgCallback, pattern: str, thread_ts: str, lifetime: float):
super().__init__(True)
self.callback = callback
self.pattern = pattern
self.thread_ts = thread_ts
self.lifetime = lifetime
self.start_time = time()
self.dead = False
def try_apply(self, event: slack_util.Event) -> Optional[MsgAction]:
# First check: are we dead of age yet?
time_alive = time() - self.start_time
should_expire = time_alive > self.lifetime
# If so, give up the ghost
if self.dead or should_expire:
raise HookDeath()
# Next make sure we're actually a message
if not (event.was_post and event.was_post and event.thread and event.conversation and event.user):
return None
# Otherwise proceed normally
# Is the msg the one we care about? If not, ignore
if event.thread.thread_ts != self.thread_ts:
return None
# Does it match the regex? if not, ignore
match = re.match(self.pattern, event.message.text.strip(), flags=re.IGNORECASE)
if match:
self.dead = True
return self.callback(event, match)
else:
return None
# The associated generic type of a button - value mapping
ActionVal = TypeVar("ActionVal")
class InteractionListener(AbsHook):
"""
Listens for replies on buttons.
Guarantees Interaction, Message, User, and Conversation
For fields that don't have a value of their own (such as buttons),
one can provide a mapping of action_ids to values.
In either case, the value is fed as a parameter to the callback.
The hook dies after successfully invoking callback.
"""
def __init__(self,
callback: Callable[[slack_util.Event, Union[ActionVal, str]], MsgAction],
action_bindings: Optional[Dict[str, ActionVal]], # The action_id -> value mapping. Value fed to callback
conversation: slack_util.Conversation, # Where the message is posted
message_ts: str, # Which message contains the block we care about
lifetime: float, # How long to keep listening
on_expire: Optional[Callable[[], None]] # Function to call on death by timeout. For instance, if you want to delete the message, or edit it to say "timed out"
):
super().__init__(True)
self.callback = callback
self.bindings = action_bindings
self.conversation = conversation
self.message_ts = message_ts
self.lifetime = lifetime
self.start_time = time()
self.on_expire = on_expire
self.dead = False
def try_apply(self, event: slack_util.Event) -> Optional[MsgAction]:
# First check: are we dead of age yet?
time_alive = time() - self.start_time
should_expire = time_alive > self.lifetime
# If so, give up the ghost
if self.dead or should_expire:
if self.on_expire and should_expire: # Call on_expire callback if we expired and it exists
self.on_expire()
raise HookDeath()
# Next make sure we've got an interaction
if not (event.interaction and event.message and event.user and event.conversation):
return None
# Otherwise proceed normally
# Is the msg the one we care about? If not, ignore
if event.message.ts != self.message_ts or event.conversation.get_conversation() != self.conversation:
return None
# Lookup the binding if we can/need to
value = event.interaction.action_value
if value is None and self.bindings is not None:
value = self.bindings.get(event.interaction.action_id)
# If the value is still none, we have an issue!
if value is None:
logging.error("Couldn't find an appropriate value for interaction {}".format(event.interaction))
return None
# Call the callback
self.dead = True
return self.callback(event, value)
class Passive(object):
"""
Base class for Periodical tasks, such as reminders and stuff
"""
async def run(self) -> None:
# Run this passive routed through the specified slack client.
raise NotImplementedError()

0
maifest.scm Normal file
View File

208
main.py
View File

@ -1,103 +1,105 @@
import asyncio
import textwrap
from typing import Match
import hooks
import settings
from plugins import identifier, job_commands, management_commands, periodicals, scroll_util, slavestothemachine
import client
import slack_util
import logging
logging.basicConfig(filename=settings.LOGFILE, filemode="w", level=logging.DEBUG, format='#!# %(levelname)s - %(asctime)s \n%(message)s \n', datefmt='%m/%d/%Y %I:%M:%S %p')
def main() -> None:
wrap = client.get_slack()
# Add scroll handling
wrap.add_hook(scroll_util.scroll_hook)
# Add id handling
wrap.add_hook(identifier.check_hook)
wrap.add_hook(identifier.identify_hook)
wrap.add_hook(identifier.identify_other_hook)
wrap.add_hook(identifier.name_hook)
# Add kill switch
wrap.add_hook(management_commands.reboot_hook)
wrap.add_hook(management_commands.log_hook)
# Add towel rolling
wrap.add_hook(slavestothemachine.count_work_hook)
# wrap.add_hook(slavestothemachine.dump_work_hook)
# Add job management
wrap.add_hook(job_commands.signoff_hook)
wrap.add_hook(job_commands.late_hook)
wrap.add_hook(job_commands.reset_hook)
wrap.add_hook(job_commands.nag_hook)
wrap.add_hook(job_commands.reassign_hook)
wrap.add_hook(job_commands.refresh_hook)
# Add help
wrap.add_hook(hooks.ChannelHook(help_callback, patterns=[r"help", r"bot\s+help"]))
# Add boozebot
# wrap.add_passive(periodicals.ItsTenPM())
# Add automatic updating of users
wrap.add_passive(periodicals.Updatinator(wrap, 120))
# Do test.
wrap.add_passive(periodicals.TestPassive())
# Add nagloop
wrap.add_passive(periodicals.NotifyJobs())
wrap.add_passive(periodicals.RemindJobs())
event_loop = asyncio.get_event_loop()
event_loop.set_debug(settings.USE_ASYNC_DEBUG_MODE)
event_handling = wrap.handle_events()
passive_handling = wrap.run_passives()
both = asyncio.gather(event_handling, passive_handling)
event_loop.run_until_complete(both)
# noinspection PyUnusedLocal
async def help_callback(event: slack_util.Event, match: Match) -> None:
client.get_slack().reply(event, textwrap.dedent("""
Commands are as follows. Note that some only work in certain channels.
"my scroll is number" : Registers your slack account to have a certain scroll, for the purpose of automatic dm's.
"@person has scroll number" : same as above, but for other users. Helpful if they are being obstinate.
"what is my scroll" : Echos back what the bot thinks your scroll is. Largely for debugging.
"what is my name" : Echos back what the bot thinks your name is. Largely for debugging. If you want to change this,
you'll need to fix the "Sorted family tree" file that the bot reads. Sorry.
"channel id #wherever" : Debug command to get a slack channels full ID
"reboot" : Restarts the server.
"signoff John Doe" : Sign off a brother's house job. Will prompt for more information if needed.
"marklate John Doe" : Same as above, but to mark a job as being completed but having been done late.
"reassign John Doe -> James Deer" : Reassign a house job.
"undo signoff John Doe" : Marks a brother's house job as incomplete. Useful if you fucked up.
"nagjobs day" : Notify in general the house jobs for the week.
"reset signoffs" : Clear points for the week, and undo all signoffs. Not frequently useful, admin only.
"refresh points" : Updates house job / signoff points for the week, after manual edits to the sheet. Admin only.
"help" : You're reading it. This is all it does. What do you want from me?
---
Also of note is that in #slavestothemachine, any wording of the format "replaced <number>", or similarly with
"washed", "dried", "rolled", or "flaked", will track your effort for the week.
Github is https://github.com/TheVillageIdiot2/waitonbot
Man in charge is Jacob Henry, but nothing lasts forever.
"""))
# Do not let my efforts fall to waste. Its a pitious legacy but its something, at least, to maybe tide the
# unending flow of work for poor Niko.
# run main
if __name__ == '__main__':
main()
import asyncio
import textwrap
from typing import Match
import hooks
import settings
from plugins import identifier, job_commands, management_commands, periodicals, scroll_util, slavestothemachine
import client
import slack_util
import logging
logging.basicConfig(filename=settings.LOGFILE, filemode="w", level=logging.DEBUG, format='#!# %(levelname)s - %(asctime)s \n%(message)s \n', datefmt='%m/%d/%Y %I:%M:%S %p')
def main() -> None:
wrap = client.get_slack()
# Add scroll handling
wrap.add_hook(scroll_util.scroll_hook)
# Add id handling
wrap.add_hook(identifier.check_hook)
wrap.add_hook(identifier.identify_hook)
wrap.add_hook(identifier.identify_other_hook)
wrap.add_hook(identifier.name_hook)
# Add kill switch
wrap.add_hook(management_commands.reboot_hook)
wrap.add_hook(management_commands.log_hook)
# Add towel rolling
wrap.add_hook(slavestothemachine.count_work_hook)
# wrap.add_hook(slavestothemachine.dump_work_hook)
# Add job management
wrap.add_hook(job_commands.signoff_hook)
wrap.add_hook(job_commands.undo_hook)
wrap.add_hook(job_commands.late_hook)
wrap.add_hook(job_commands.reset_hook)
wrap.add_hook(job_commands.nag_hook)
wrap.add_hook(job_commands.reassign_hook)
wrap.add_hook(job_commands.refresh_hook)
# Add help
wrap.add_hook(hooks.ChannelHook(help_callback, patterns=[r"help", r"bot\s+help"]))
# Add boozebot
# wrap.add_passive(periodicals.ItsTenPM())
# Add automatic updating of users
wrap.add_passive(periodicals.Updatinator(wrap, 120))
# Do test.
wrap.add_passive(periodicals.TestPassive())
# Add nagloop
wrap.add_passive(periodicals.NotifyJobs())
wrap.add_passive(periodicals.RemindJobs())
event_loop = asyncio.get_event_loop()
event_loop.set_debug(settings.USE_ASYNC_DEBUG_MODE)
event_handling = wrap.handle_events()
passive_handling = wrap.run_passives()
both = asyncio.gather(event_handling, passive_handling)
event_loop.run_until_complete(both)
# noinspection PyUnusedLocal
async def help_callback(event: slack_util.Event, match: Match) -> None:
client.get_slack().reply(event, textwrap.dedent("""
Commands are as follows. Note that some only work in certain channels.
"my scroll is number" : Registers your slack account to have a certain scroll, for the purpose of automatic dm's.
"@person has scroll number" : same as above, but for other users. Helpful if they are being obstinate.
"what is my scroll" : Echos back what the bot thinks your scroll is. Largely for debugging.
"what is my name" : Echos back what the bot thinks your name is. Largely for debugging. If you want to change this,
you'll need to fix the "Sorted family tree" file that the bot reads. Sorry.
"channel id #wherever" : Debug command to get a slack channels full ID
"reboot" : Restarts the server.
"signoff John Doe" : Sign off a brother's house job. Will prompt for more information if needed.
"marklate John Doe" : Same as above, but to mark a job as being completed but having been done late.
"reassign John Doe -> James Deer" : Reassign a house job.
"undo signoff John Doe" : Marks a brother's house job as incomplete. Useful if you fucked up.
"reset signoffs" : Clear points for the week, and undo all signoffs. Not frequently useful, admin only.
"refresh points" : Updates house job / signoff points for the week, after manual edits to the sheet. Admin only.
"help" : You're reading it. This is all it does. What do you want from me?
---
Also of note is that in #slavestothemachine, any wording of the format "replaced <number>", or similarly with
"washed", "dried", "rolled", or "flaked", will track your effort for the week.
Github is https://github.com/TheVillageIdiot2/waitonbot
Man in charge is Jacob Henry, but nothing lasts forever.
Edit January 2023: Patched by Andrew Kerekon, 1126. Contact arkerekon@wpi.edu if anything breaks and I'll do my best to fix it!
"""))
# Do not let my efforts fall to waste. Its a pitious legacy but its something, at least, to maybe tide the
# unending flow of work for poor Niko.
# run main
if __name__ == '__main__':
main()

View File

@ -1,301 +1,301 @@
import dataclasses
from dataclasses import dataclass
from datetime import date, timedelta
from typing import Tuple, List, Optional, Any
import google_api
from plugins import scroll_util
SHEET_ID = "1f9p4H7TWPm8rAM4v_qr2Vc6lBiFNEmR-quTY9UtxEBI"
# Note: These ranges use named range feature of google sheets.
# To edit range of jobs, edit the named range in Data -> Named Ranges
job_range = "AllJobs" # Note that the first row is headers
point_range = "PointRange"
# How tolerant of spelling errors in names to be
SHEET_LOOKUP_THRESHOLD = 80.0
JOB_VAL = 1
LATE_VAL = 0.5
MISS_VAL = -1
SIGNOFF_VAL = 0.1
TOWEL_VAL = 0.1
# What to put for a non-signed-off job
SIGNOFF_PLACEHOLDER = "E-SIGNOFF"
NOT_ASSIGNED = "N/A"
@dataclass
class Job(object):
"""
Represents a job in a more internally meaningful way.
"""
name: str
house: str
day_of_week: str
# Extra stuff, interpreted
day: Optional[date]
def pretty_fmt(self) -> str:
return "{} - {} at {}".format(self.name, self.day_of_week, self.house)
@dataclass
class JobAssignment(object):
"""
Tracks a job's assignment and completion
"""
job: Job
assignee: Optional[scroll_util.Brother]
signer: Optional[scroll_util.Brother]
late: bool
bonus: bool
def to_raw(self) -> Tuple[str, str, str, str, str, str, str]:
# Converts this back into a spreadsheet row
signer_name = self.signer.name if self.signer is not None else SIGNOFF_PLACEHOLDER
late = "y" if self.late else "n"
bonus = "y" if self.bonus else "n"
assignee = self.assignee.name if self.assignee else NOT_ASSIGNED
return self.job.name, self.job.house, self.job.day_of_week, assignee, signer_name, late, bonus
@dataclass
class PointStatus(object):
"""
Tracks a brothers points
"""
brother: scroll_util.Brother
job_points: float = 0
signoff_points: float = 0
towel_points: float = 0
work_party_points: float = 0
bonus_points: float = 0
def to_raw(self) -> Tuple[str, float, float, float, float, float]:
# Convert to a row. Also, do some rounding while we're at it
def fmt(x: float):
return round(x, 2)
return (self.brother.name,
fmt(self.job_points),
fmt(self.signoff_points),
fmt(self.towel_points),
fmt(self.work_party_points),
fmt(self.bonus_points),
)
@property
def towel_contribution_count(self) -> int:
return round(self.towel_points / TOWEL_VAL)
@towel_contribution_count.setter
def towel_contribution_count(self, val: int) -> None:
self.towel_points = val * TOWEL_VAL
def strip_all(l: List[str]) -> List[str]:
return [x.strip() for x in l]
async def import_assignments() -> List[Optional[JobAssignment]]:
"""
Imports Jobs and JobAssignments from the sheet. 1:1 row correspondence.
"""
# Get the raw data
job_rows = google_api.get_sheet_range(SHEET_ID, job_range)
# None-out invalid rows (length not at least 4, which includes the 4 most important features)
def fixer(row):
if len(row) == 4:
return strip_all(row + [SIGNOFF_PLACEHOLDER, "n", "n"])
elif len(row) == 5:
return strip_all(row + ["n", "n"])
elif len(row) == 6:
return strip_all(row + ["n"])
elif len(row) == 7:
return strip_all(row)
else:
return None
# Apply the fix
job_rows = [fixer(row) for row in job_rows]
# Now, create jobs
assignments = []
for row in job_rows:
if row is None:
assignments.append(None)
else:
# Breakout list
job_name, location, day, assignee, signer, late, bonus = row
# Figure out when the day actually is, in terms of the date class
day_rank = {
"monday": 0,
"tuesday": 1,
"wednesday": 2,
"thursday": 3,
"friday": 4,
"saturday": 5,
"sunday": 6
}.get(day.lower(), None)
if day_rank is not None:
# Figure out current date day of week, and extrapolate the jobs day of week from there
today = date.today()
today_rank = today.weekday()
days_till = day_rank - today_rank
if days_till <= 0:
days_till += 7
# Now we know what day it is!
job_day = today + timedelta(days=days_till)
else:
# Can't win 'em all
job_day = None
# Create the job
job = Job(name=job_name, house=location, day_of_week=day, day=job_day)
# Now make an assignment for the job
# Find the brother it is assigned to
if assignee is not None and assignee != "" and assignee != NOT_ASSIGNED:
try:
assignee = await scroll_util.find_by_name(assignee, SHEET_LOOKUP_THRESHOLD)
except scroll_util.BrotherNotFound:
# If we can't get one close enough, make a dummy
assignee = scroll_util.Brother(assignee, scroll_util.MISSINGBRO_SCROLL)
else:
assignee = None
# Find the brother who is currently listed as having signed it off
try:
if signer == SIGNOFF_PLACEHOLDER:
signer = None
else:
signer = await scroll_util.find_by_name(signer)
except scroll_util.BrotherNotFound:
# If we can't figure out the name
signer = None
# Make late a bool
late = late == "y"
# Ditto for bonus
bonus = bonus == "y"
# Create the assignment
assignment = JobAssignment(job=job, assignee=assignee, signer=signer, late=late, bonus=bonus)
# Append to job/assignment lists
assignments.append(assignment)
# Git 'em gone
return assignments
async def export_assignments(assigns: List[Optional[JobAssignment]]) -> None:
# Smash to rows
rows = []
for v in assigns:
if v is None:
rows.append([""] * 7)
else:
rows.append(list(v.to_raw()))
# Send to google
google_api.set_sheet_range(SHEET_ID, job_range, rows)
async def import_points() -> (List[str], List[PointStatus]):
# Figure out how many things there are in a point status
field_count = len(dataclasses.fields(PointStatus))
# Get the raw data
point_rows = google_api.get_sheet_range(SHEET_ID, point_range)
# Get the headers
headers = point_rows[0]
point_rows = point_rows[1:]
# Tidy rows up
async def converter(row: List[Any]) -> Optional[PointStatus]:
# If its too long, or empty already, ignore
if len(row) == 0 or len(row) > field_count:
return None
# Ensure its the proper length
while len(row) < field_count:
row: List[Any] = row + [0]
# Ensure all past the first column are float. If can't convert, make 0
for i in range(1, len(row)):
try:
x = float(row[i])
except ValueError:
x = 0
row[i] = x
# Get the brother for the last item
try:
brother = await scroll_util.find_by_name(row[0])
except scroll_util.BrotherNotFound:
brother = scroll_util.Brother(row[0], scroll_util.MISSINGBRO_SCROLL)
# Ok! Now, we just map it directly to a PointStatus
status = PointStatus(brother, *(row[1:]))
return status
# Perform conversion and return
point_statuses = [await converter(row) for row in point_rows]
return headers, point_statuses
def export_points(headers: List[str], points: List[PointStatus]) -> None:
# Smash to rows
rows = [list(point_status.to_raw()) for point_status in points]
rows = [headers] + rows
# Send to google
google_api.set_sheet_range(SHEET_ID, point_range, rows)
def apply_house_points(points: List[PointStatus], assigns: List[Optional[JobAssignment]]):
"""
Modifies the points list to reflect job assignment scores.
Destroys existing values in the column.
Should be called each time we re-export, for validations sake.
"""
# First, eliminate all house points and signoff points
for p in points:
p.job_points = 0
p.signoff_points = 0
# Then, apply each assign
for a in assigns:
# Ignore null assigns
if a is None:
continue
# What modifier should this have?
if a.signer is None:
job_score = MISS_VAL
else:
if a.late:
job_score = LATE_VAL
else:
job_score = JOB_VAL
# Find the corr bro in points
for p in points:
# If we find assignee, add the score, but don't stop looking since we also need to find signer
if p.brother == a.assignee:
p.job_points += job_score
# If we find the signer, add a signoff reward
if p.brother == a.signer:
p.signoff_points += SIGNOFF_VAL
import dataclasses
from dataclasses import dataclass
from datetime import date, timedelta
from typing import Tuple, List, Optional, Any
import google_api
from plugins import scroll_util
SHEET_ID = "1f9p4H7TWPm8rAM4v_qr2Vc6lBiFNEmR-quTY9UtxEBI"
# Note: These ranges use named range feature of google sheets.
# To edit range of jobs, edit the named range in Data -> Named Ranges
job_range = "AllJobs" # Note that the first row is headers
point_range = "PointRange"
# How tolerant of spelling errors in names to be
SHEET_LOOKUP_THRESHOLD = 80.0
JOB_VAL = 1
LATE_VAL = 0.5
MISS_VAL = -1
SIGNOFF_VAL = 0.1
TOWEL_VAL = 0.1
# What to put for a non-signed-off job
SIGNOFF_PLACEHOLDER = "E-SIGNOFF"
NOT_ASSIGNED = "N/A"
@dataclass
class Job(object):
"""
Represents a job in a more internally meaningful way.
"""
name: str
house: str
day_of_week: str
# Extra stuff, interpreted
day: Optional[date]
def pretty_fmt(self) -> str:
return "{} - {} at {}".format(self.name, self.day_of_week, self.house)
@dataclass
class JobAssignment(object):
"""
Tracks a job's assignment and completion
"""
job: Job
assignee: Optional[scroll_util.Brother]
signer: Optional[scroll_util.Brother]
late: bool
bonus: bool
def to_raw(self) -> Tuple[str, str, str, str, str, str, str]:
# Converts this back into a spreadsheet row
signer_name = self.signer.name if self.signer is not None else SIGNOFF_PLACEHOLDER
late = "y" if self.late else "n"
bonus = "y" if self.bonus else "n"
assignee = self.assignee.name if self.assignee else NOT_ASSIGNED
return self.job.name, self.job.house, self.job.day_of_week, assignee, signer_name, late, bonus
@dataclass
class PointStatus(object):
"""
Tracks a brothers points
"""
brother: scroll_util.Brother
job_points: float = 0
signoff_points: float = 0
towel_points: float = 0
work_party_points: float = 0
bonus_points: float = 0
def to_raw(self) -> Tuple[str, float, float, float, float, float]:
# Convert to a row. Also, do some rounding while we're at it
def fmt(x: float):
return round(x, 2)
return (self.brother.name,
fmt(self.job_points),
fmt(self.signoff_points),
fmt(self.towel_points),
fmt(self.work_party_points),
fmt(self.bonus_points),
)
@property
def towel_contribution_count(self) -> int:
return round(self.towel_points / TOWEL_VAL)
@towel_contribution_count.setter
def towel_contribution_count(self, val: int) -> None:
self.towel_points = val * TOWEL_VAL
def strip_all(l: List[str]) -> List[str]:
return [x.strip() for x in l]
async def import_assignments() -> List[Optional[JobAssignment]]:
"""
Imports Jobs and JobAssignments from the sheet. 1:1 row correspondence.
"""
# Get the raw data
job_rows = google_api.get_sheet_range(SHEET_ID, job_range)
# None-out invalid rows (length not at least 4, which includes the 4 most important features)
def fixer(row):
if len(row) == 4:
return strip_all(row + [SIGNOFF_PLACEHOLDER, "n", "n"])
elif len(row) == 5:
return strip_all(row + ["n", "n"])
elif len(row) == 6:
return strip_all(row + ["n"])
elif len(row) == 7:
return strip_all(row)
else:
return None
# Apply the fix
job_rows = [fixer(row) for row in job_rows]
# Now, create jobs
assignments = []
for row in job_rows:
if row is None:
assignments.append(None)
else:
# Breakout list
job_name, location, day, assignee, signer, late, bonus = row
# Figure out when the day actually is, in terms of the date class
day_rank = {
"monday": 0,
"tuesday": 1,
"wednesday": 2,
"thursday": 3,
"friday": 4,
"saturday": 5,
"sunday": 6
}.get(day.lower(), None)
if day_rank is not None:
# Figure out current date day of week, and extrapolate the jobs day of week from there
today = date.today()
today_rank = today.weekday()
days_till = day_rank - today_rank
if days_till <= 0:
days_till += 7
# Now we know what day it is!
job_day = today + timedelta(days=days_till)
else:
# Can't win 'em all
job_day = None
# Create the job
job = Job(name=job_name, house=location, day_of_week=day, day=job_day)
# Now make an assignment for the job
# Find the brother it is assigned to
if assignee is not None and assignee != "" and assignee != NOT_ASSIGNED:
try:
assignee = await scroll_util.find_by_name(assignee, SHEET_LOOKUP_THRESHOLD)
except scroll_util.BrotherNotFound:
# If we can't get one close enough, make a dummy
assignee = scroll_util.Brother(assignee, scroll_util.MISSINGBRO_SCROLL)
else:
assignee = None
# Find the brother who is currently listed as having signed it off
try:
if signer == SIGNOFF_PLACEHOLDER:
signer = None
else:
signer = await scroll_util.find_by_name(signer)
except scroll_util.BrotherNotFound:
# If we can't figure out the name
signer = None
# Make late a bool
late = late == "y"
# Ditto for bonus
bonus = bonus == "y"
# Create the assignment
assignment = JobAssignment(job=job, assignee=assignee, signer=signer, late=late, bonus=bonus)
# Append to job/assignment lists
assignments.append(assignment)
# Git 'em gone
return assignments
async def export_assignments(assigns: List[Optional[JobAssignment]]) -> None:
# Smash to rows
rows = []
for v in assigns:
if v is None:
rows.append([""] * 7)
else:
rows.append(list(v.to_raw()))
# Send to google
google_api.set_sheet_range(SHEET_ID, job_range, rows)
async def import_points():
# Figure out how many things there are in a point status
field_count = len(dataclasses.fields(PointStatus))
# Get the raw data
point_rows = google_api.get_sheet_range(SHEET_ID, point_range)
# Get the headers
headers = point_rows[0]
point_rows = point_rows[1:]
# Tidy rows up
async def converter(row: List[Any]) -> Optional[PointStatus]:
# If its too long, or empty already, ignore
if len(row) == 0 or len(row) > field_count:
return None
# Ensure its the proper length
while len(row) < field_count:
row: List[Any] = row + [0]
# Ensure all past the first column are float. If can't convert, make 0
for i in range(1, len(row)):
try:
x = float(row[i])
except ValueError:
x = 0
row[i] = x
# Get the brother for the last item
try:
brother = await scroll_util.find_by_name(row[0])
except scroll_util.BrotherNotFound:
brother = scroll_util.Brother(row[0], scroll_util.MISSINGBRO_SCROLL)
# Ok! Now, we just map it directly to a PointStatus
status = PointStatus(brother, *(row[1:]))
return status
# Perform conversion and return
point_statuses = [await converter(row) for row in point_rows]
return headers, point_statuses
def export_points(headers: List[str], points: List[PointStatus]) -> None:
# Smash to rows
rows = [list(point_status.to_raw()) for point_status in points]
rows = [headers] + rows
# Send to google
google_api.set_sheet_range(SHEET_ID, point_range, rows)
def apply_house_points(points: List[PointStatus], assigns: List[Optional[JobAssignment]]):
"""
Modifies the points list to reflect job assignment scores.
Destroys existing values in the column.
Should be called each time we re-export, for validations sake.
"""
# First, eliminate all house points and signoff points
for p in points:
p.job_points = 0
p.signoff_points = 0
# Then, apply each assign
for a in assigns:
# Ignore null assigns
if a is None:
continue
# What modifier should this have?
if a.signer is None:
job_score = MISS_VAL
else:
if a.late:
job_score = LATE_VAL
else:
job_score = JOB_VAL
# Find the corr bro in points
for p in points:
# If we find assignee, add the score, but don't stop looking since we also need to find signer
if p.brother == a.assignee:
p.job_points += job_score
# If we find the signer, add a signoff reward
if p.brother == a.signer:
p.signoff_points += SIGNOFF_VAL

View File

@ -1,142 +1,142 @@
"""
Allows users to register their user account as a specific scroll
"""
import asyncio
import shelve
from typing import List, Match
import hooks
from plugins import scroll_util
import client
import slack_util
# The following db maps SLACK_USER_ID -> SCROLL_INTEGER
DB_NAME = "user_scrolls"
DB_LOCK = asyncio.Lock()
# Initialize the hooks
NON_REG_MSG = ("You currently have no scroll registered. To register, type\n"
"i am 666\n"
"except with your scroll instead of 666")
async def identify_callback(event: slack_util.Event, match: Match):
"""
Sets the users scroll
"""
async with DB_LOCK:
with shelve.open(DB_NAME) as db:
# Get the query
query = match.group(1).strip()
try:
user = event.user.user_id
scroll = int(query)
db[user] = scroll
result = "Updated user {} to have scroll {}".format(user, scroll)
except ValueError:
result = "Bad scroll: {}".format(query)
# Respond
client.get_slack().reply(event, result)
async def identify_other_callback(event: slack_util.Event, match: Match):
"""
Sets another users scroll
"""
async with DB_LOCK:
with shelve.open(DB_NAME) as db:
# Get the query
user = match.group(1).strip()
scroll_txt = match.group(2).strip()
try:
scroll = int(scroll_txt)
if user in db:
result = "To prevent trolling, once a users id has been set only they can change it"
else:
db[user] = scroll
result = "Updated user {} to have scroll {}".format(user, scroll)
except ValueError:
result = "Bad scroll: {}".format(scroll_txt)
# Respond
client.get_slack().reply(event, result)
# noinspection PyUnusedLocal
async def check_callback(event: slack_util.Event, match: Match):
"""
Replies with the users current scroll assignment
"""
async with DB_LOCK:
# Tells the user their current scroll
with shelve.open(DB_NAME) as db:
try:
scroll = db[event.user.user_id]
result = "You are currently registered with scroll {}".format(scroll)
except KeyError:
result = NON_REG_MSG
client.get_slack().reply(event, result)
# noinspection PyUnusedLocal
async def name_callback(event: slack_util.Event, match: Match):
"""
Tells the user what it thinks the calling users name is.
"""
async with DB_LOCK:
with shelve.open(DB_NAME) as db:
try:
scroll = db[event.user.user_id]
brother = scroll_util.find_by_scroll(scroll)
if brother:
result = "The bot thinks your name is {}".format(brother.name)
else:
result = "The bot couldn't find a name for scroll {}".format(scroll)
except (KeyError, ValueError):
result = NON_REG_MSG
# Respond
client.get_slack().reply(event, result)
async def lookup_slackid_brother(slack_id: str) -> scroll_util.Brother:
"""
Gets whatever brother the userid is registered to
:raises BrotherNotFound:
:return: Brother object or None
"""
async with DB_LOCK:
with shelve.open(DB_NAME) as db:
try:
scroll = db[slack_id]
return scroll_util.find_by_scroll(scroll)
except (KeyError, ValueError):
raise scroll_util.BrotherNotFound("Slack id {} not tied to brother".format(slack_id))
raise scroll_util.BrotherNotFound("Couldn't find an appropriate brother for slackid {}.".format(slack_id))
async def lookup_brother_userids(brother: scroll_util.Brother) -> List[str]:
"""
Returns a list of all userids associated with the given brother.
:param brother: Brother to lookup scrolls for
:return: List of user id strings (may be empty)
"""
async with DB_LOCK:
with shelve.open(DB_NAME) as db:
keys = db.keys()
result = []
for user_id in keys:
if db[user_id] == brother.scroll:
result.append(user_id)
return result
identify_hook = hooks.ChannelHook(identify_callback, patterns=r"my scroll is (.*)")
identify_other_hook = hooks.ChannelHook(identify_other_callback, patterns=r"<@(.*)>\s+has scroll\s+(.*)")
check_hook = hooks.ChannelHook(check_callback, patterns=r"what is my scroll")
name_hook = hooks.ChannelHook(name_callback, patterns=r"what is my name")
"""
Allows users to register their user account as a specific scroll
"""
import asyncio
import shelve
from typing import List, Match
import hooks
from plugins import scroll_util
import client
import slack_util
# The following db maps SLACK_USER_ID -> SCROLL_INTEGER
DB_NAME = "user_scrolls"
DB_LOCK = asyncio.Lock()
# Initialize the hooks
NON_REG_MSG = ("You currently have no scroll registered. To register, type\n"
"i am 666\n"
"except with your scroll instead of 666")
async def identify_callback(event: slack_util.Event, match: Match):
"""
Sets the users scroll
"""
async with DB_LOCK:
with shelve.open(DB_NAME) as db:
# Get the query
query = match.group(1).strip()
try:
user = event.user.user_id
scroll = int(query)
db[user] = scroll
result = "Updated user {} to have scroll {}".format(user, scroll)
except ValueError:
result = "Bad scroll: {}".format(query)
# Respond
client.get_slack().reply(event, result)
async def identify_other_callback(event: slack_util.Event, match: Match):
"""
Sets another users scroll
"""
async with DB_LOCK:
with shelve.open(DB_NAME) as db:
# Get the query
user = match.group(1).strip()
scroll_txt = match.group(2).strip()
try:
scroll = int(scroll_txt)
if user in db:
result = "To prevent trolling, once a users id has been set only they can change it"
else:
db[user] = scroll
result = "Updated user {} to have scroll {}".format(user, scroll)
except ValueError:
result = "Bad scroll: {}".format(scroll_txt)
# Respond
client.get_slack().reply(event, result)
# noinspection PyUnusedLocal
async def check_callback(event: slack_util.Event, match: Match):
"""
Replies with the users current scroll assignment
"""
async with DB_LOCK:
# Tells the user their current scroll
with shelve.open(DB_NAME) as db:
try:
scroll = db[event.user.user_id]
result = "You are currently registered with scroll {}".format(scroll)
except KeyError:
result = NON_REG_MSG
client.get_slack().reply(event, result)
# noinspection PyUnusedLocal
async def name_callback(event: slack_util.Event, match: Match):
"""
Tells the user what it thinks the calling users name is.
"""
async with DB_LOCK:
with shelve.open(DB_NAME) as db:
try:
scroll = db[event.user.user_id]
brother = scroll_util.find_by_scroll(scroll)
if brother:
result = "The bot thinks your name is {}".format(brother.name)
else:
result = "The bot couldn't find a name for scroll {}".format(scroll)
except (KeyError, ValueError):
result = NON_REG_MSG
# Respond
client.get_slack().reply(event, result)
async def lookup_slackid_brother(slack_id: str) -> scroll_util.Brother:
"""
Gets whatever brother the userid is registered to
:raises BrotherNotFound:
:return: Brother object or None
"""
async with DB_LOCK:
with shelve.open(DB_NAME) as db:
try:
scroll = db[slack_id]
return scroll_util.find_by_scroll(scroll)
except (KeyError, ValueError):
raise scroll_util.BrotherNotFound("Slack id {} not tied to brother".format(slack_id))
raise scroll_util.BrotherNotFound("Couldn't find an appropriate brother for slackid {}.".format(slack_id))
async def lookup_brother_userids(brother: scroll_util.Brother) -> List[str]:
"""
Returns a list of all userids associated with the given brother.
:param brother: Brother to lookup scrolls for
:return: List of user id strings (may be empty)
"""
async with DB_LOCK:
with shelve.open(DB_NAME) as db:
keys = db.keys()
result = []
for user_id in keys:
if db[user_id] == brother.scroll:
result.append(user_id)
return result
identify_hook = hooks.ChannelHook(identify_callback, patterns=r"my scroll is (.*)")
identify_other_hook = hooks.ChannelHook(identify_other_callback, patterns=r"<@(.*)>\s+has scroll\s+(.*)")
check_hook = hooks.ChannelHook(check_callback, patterns=r"what is my scroll")
name_hook = hooks.ChannelHook(name_callback, patterns=r"what is my name")

View File

@ -1,424 +1,428 @@
import logging
from dataclasses import dataclass
from typing import List, Match, Callable, TypeVar, Optional, Iterable, Any, Coroutine
from fuzzywuzzy import fuzz
import hooks
from plugins import identifier, house_management, scroll_util
import client
import slack_util
SHEET_ID = "1lPj9GjB00BuIq9GelOWh5GmiGsheLlowPnHLnWBvMOM"
MIN_RATIO = 80.0
async def alert_user(brother: scroll_util.Brother, saywhat: str) -> None:
"""
DM a brother saying something. Wrapper around several simpler methods
"""
# We do this as a for loop just in case multiple people reg. to same scroll for some reason (e.g. dup accounts)
succ = False
for slack_id in await identifier.lookup_brother_userids(brother):
client.get_slack().send_message(saywhat, slack_id)
succ = True
# Warn if we never find
if not succ:
logging.warning("Unable to find dm conversation for brother {}".format(brother))
# Generic type
T = TypeVar("T")
def tiemax(items: Iterable[T], key: Callable[[T], Optional[float]]) -> List[T]:
best = []
best_score = None
for elt in items:
# Compute the score
score = key(elt)
# Ignore blank scores
if score is None:
continue
# Check if its the new best, wiping old if so
elif best_score is None or score > best_score:
best_score = score
best = [elt]
# Check if its same as last best
elif score == best_score:
best.append(elt)
return best
@dataclass
class _ModJobContext:
signer: scroll_util.Brother # The brother invoking the command
assign: house_management.JobAssignment # The job assignment to modify
async def _mod_jobs(event: slack_util.Event,
relevance_scorer: Callable[[house_management.JobAssignment], Optional[float]],
modifier: Callable[[_ModJobContext], Coroutine[Any, Any, None]],
no_job_msg: str = None
) -> None:
"""
Stub function that handles various tasks relating to modifying jobs
:param relevance_scorer: Function scores job assignments on relevance. Determines which gets modified
:param modifier: Callback function to modify a job. Only called on a successful operation, and only on one job
"""
# Make an error wrapper
verb = slack_util.VerboseWrapper(event)
# Who invoked this command?
signer = await verb(event.user.as_user().get_brother())
# Get all of the assignments
assigns = await verb(house_management.import_assignments())
# Find closest assignment to what we're after. This just wraps relevance_scorer to handle nones.
def none_scorer(a: Optional[house_management.JobAssignment]) -> Optional[float]:
if a is None:
return None
else:
return relevance_scorer(a)
closest_assigns = tiemax(assigns, key=none_scorer)
# This is what we do on success. It will or won't be called immediately based on what's in closest_assigns
async def success_callback(targ_assign: house_management.JobAssignment) -> None:
# First get the most up to date version of the jobs
fresh_assigns = await verb(house_management.import_assignments())
# Find the one that matches what we had before
fresh_targ_assign = fresh_assigns[fresh_assigns.index(targ_assign)]
# Create the context
context = _ModJobContext(signer, fresh_targ_assign)
# Modify it
await modifier(context)
# Re-upload
await house_management.export_assignments(fresh_assigns)
# Also import and update points
headers, points = await house_management.import_points()
house_management.apply_house_points(points, fresh_assigns)
house_management.export_points(headers, points)
# If there aren't any jobs, say so
if len(closest_assigns) == 0:
if no_job_msg is None:
no_job_msg = "Unable to find any jobs to apply this command to. Try again with better spelling or whatever."
client.get_slack().reply(event, no_job_msg)
# If theres only one job, sign it off
elif len(closest_assigns) == 1:
await success_callback(closest_assigns[0])
# If theres multiple jobs, we need to get a follow up!
else:
# Say we need more info
job_list = "\n".join("{}: {}".format(i, a.job.pretty_fmt()) for i, a in enumerate(closest_assigns))
client.get_slack().reply(event, "Multiple relevant job listings found.\n"
"Please enter the number corresponding to the job "
"you wish to modify:\n{}".format(job_list))
# Establish a follow up command pattern
pattern = r"\d+"
# Make the follow up callback
async def foc(_event: slack_util.Event, _match: Match) -> None:
# Get the number out
index = int(_match.group(0))
# Check that its valid
if 0 <= index < len(closest_assigns):
# We now know what we're trying to sign off!
await success_callback(closest_assigns[index])
else:
# They gave a bad index, or we were unable to find the assignment again.
client.get_slack().reply(_event, "Invalid job index / job unable to be found.")
# Make a listener hook
new_hook = hooks.ReplyWaiter(foc, pattern, event.message.ts, 120)
# Register it
client.get_slack().add_hook(new_hook)
async def signoff_callback(event: slack_util.Event, match: Match) -> None:
verb = slack_util.VerboseWrapper(event)
# Find out who we are trying to sign off is
signee_name = match.group(1)
signee = await verb(scroll_util.find_by_name(signee_name, MIN_RATIO))
# Score by name similarity, only accepting non-assigned jobs
def scorer(assign: house_management.JobAssignment):
if assign.assignee is not None:
r = fuzz.ratio(signee.name, assign.assignee.name)
if assign.signer is None and r > MIN_RATIO:
return r
# Set the assigner, and notify
async def modifier(context: _ModJobContext):
context.assign.signer = context.signer
# Say we did it wooo!
client.get_slack().reply(event, "Signed off {} for {}".format(context.assign.assignee.name,
context.assign.job.name))
await alert_user(context.assign.assignee, "{} signed you off for {}.".format(context.assign.signer.name,
context.assign.job.pretty_fmt()))
# Fire it off
await _mod_jobs(event, scorer, modifier)
async def undo_callback(event: slack_util.Event, match: Match) -> None:
verb = slack_util.VerboseWrapper(event)
# Find out who we are trying to sign off is
signee_name = match.group(1)
signee = await verb(scroll_util.find_by_name(signee_name, MIN_RATIO))
# Score by name similarity, only accepting jobs that are signed off
def scorer(assign: house_management.JobAssignment):
if assign.assignee is not None:
r = fuzz.ratio(signee.name, assign.assignee.name)
if assign.signer is not None and r > MIN_RATIO:
return r
# Set the assigner to be None, and notify
async def modifier(context: _ModJobContext):
context.assign.signer = None
# Say we did it wooo!
client.get_slack().reply(event, "Undid signoff of {} for {}".format(context.assign.assignee.name,
context.assign.job.name))
await alert_user(context.assign.assignee, "{} undid your signoff off for {}.\n"
"Must have been a mistake".format(context.assign.signer.name,
context.assign.job.pretty_fmt()))
# Fire it off
await _mod_jobs(event, scorer, modifier)
async def late_callback(event: slack_util.Event, match: Match) -> None:
verb = slack_util.VerboseWrapper(event)
# Find out who we are trying to sign off is
signee_name = match.group(1)
signee = await verb(scroll_util.find_by_name(signee_name, MIN_RATIO))
# Score by name similarity. Don't care if signed off or not
def scorer(assign: house_management.JobAssignment):
if assign.assignee is not None:
r = fuzz.ratio(signee.name, assign.assignee.name)
if r > MIN_RATIO:
return r
# Just set the assigner
async def modifier(context: _ModJobContext):
context.assign.late = not context.assign.late
# Say we did it
client.get_slack().reply(event, "Toggled lateness of {}.\n"
"Now marked as late: {}".format(context.assign.job.pretty_fmt(),
context.assign.late))
# Fire it off
await _mod_jobs(event, scorer, modifier)
async def reassign_callback(event: slack_util.Event, match: Match) -> None:
verb = slack_util.VerboseWrapper(event)
# Find out our two targets
from_name = match.group(1).strip()
to_name = match.group(2).strip()
# Get them as brothers
from_bro = await verb(scroll_util.find_by_name(from_name, MIN_RATIO))
to_bro = await verb(scroll_util.find_by_name(to_name, MIN_RATIO))
# Score by name similarity to the first brother. Don't care if signed off or not,
# as we want to be able to transfer even after signoffs (why not, amirite?)
def scorer(assign: house_management.JobAssignment):
if assign.assignee is not None:
r = fuzz.ratio(from_bro.name, assign.assignee.name)
if r > MIN_RATIO:
return r
# Change the assignee
async def modifier(context: _ModJobContext):
context.assign.assignee = to_bro
# Say we did it
reassign_msg = "Job {} reassigned from {} to {}".format(context.assign.job.pretty_fmt(),
from_bro,
to_bro)
client.get_slack().reply(event, reassign_msg)
# Tell the people
reassign_msg = "Job {} reassigned from {} to {}".format(context.assign.job.pretty_fmt(),
from_bro,
to_bro)
await alert_user(from_bro, reassign_msg)
await alert_user(to_bro, reassign_msg)
# Fire it off
await _mod_jobs(event, scorer, modifier)
# noinspection PyUnusedLocal
async def reset_callback(event: slack_util.Event, match: Match) -> None:
"""
Resets the scores.
"""
# Unassign everything
assigns = await house_management.import_assignments()
for a in assigns:
if a is not None:
a.signer = None
await house_management.export_assignments(assigns)
# Now wipe points
headers, points = house_management.import_points()
# Set to 0/default
for i in range(len(points)):
new = house_management.PointStatus(brother=points[i].brother)
points[i] = new
house_management.apply_house_points(points, await house_management.import_assignments())
house_management.export_points(headers, points)
client.get_slack().reply(event, "Reset scores and signoffs")
# noinspection PyUnusedLocal
async def refresh_callback(event: slack_util.Event, match: Match) -> None:
headers, points = await house_management.import_points()
house_management.apply_house_points(points, await house_management.import_assignments())
house_management.export_points(headers, points)
client.get_slack().reply(event, "Force updated point values")
async def nag_callback(event: slack_util.Event, match: Match) -> None:
# Get the day
day = match.group(1).lower().strip()
if not await nag_jobs(day):
client.get_slack().reply(event,
"No jobs found. Check that the day is spelled correctly, with no extra symbols.\n"
"It is possible that all jobs have been signed off, as well.",
in_thread=True)
# Wrapper so we can auto-call this as well
async def nag_jobs(day_of_week: str) -> bool:
# Get the assigns
assigns = await house_management.import_assignments()
# Filter to day
assigns = [assign for assign in assigns if assign is not None and assign.job.day_of_week.lower() == day_of_week]
# Filter signed off
assigns = [assign for assign in assigns if assign.signer is None]
# If no jobs found, somethings up. Probably mispelled day. Return failure
if not assigns:
return False
# Nag each
response = "Do yer jerbs! They are as follows:\n"
for assign in assigns:
# Make the row template
if assign.assignee is None:
continue
response += "({}) {} -- {} ".format(assign.job.house, assign.job.name, assign.assignee.name)
# Find the people to @
brother_slack_ids = await identifier.lookup_brother_userids(assign.assignee)
if brother_slack_ids:
for slack_id in brother_slack_ids:
response += "<@{}> ".format(slack_id)
else:
response += "(scroll missing. Please register for @ pings!)"
response += "\n"
general_id = client.get_slack().get_conversation_by_name("#general").id
client.get_slack().send_message(response, general_id)
return True
signoff_hook = hooks.ChannelHook(signoff_callback,
patterns=[
r"signoff\s+(.*)",
r"sign off\s+(.*)",
],
channel_whitelist=["#housejobs"])
undo_hook = hooks.ChannelHook(undo_callback,
patterns=[
r"unsignoff\s+(.*)",
r"undosignoff\s+(.*)",
r"undo signoff\s+(.*)",
],
channel_whitelist=["#housejobs"])
late_hook = hooks.ChannelHook(late_callback,
patterns=[
r"marklate\s+(.*)",
r"mark late\s+(.*)",
],
channel_whitelist=["#housejobs"])
reset_hook = hooks.ChannelHook(reset_callback,
patterns=[
r"reset signoffs",
r"reset sign offs",
],
channel_whitelist=["#command-center"])
nag_hook = hooks.ChannelHook(nag_callback,
patterns=[
r"nagjobs\s+(.*)",
r"nag jobs\s+(.*)"
],
channel_whitelist=["#command-center"])
reassign_hook = hooks.ChannelHook(reassign_callback,
patterns=r"reassign\s+(.*?)-&gt;\s+(.+)",
channel_whitelist=["#housejobs"])
refresh_hook = hooks.ChannelHook(refresh_callback,
patterns=[
"refresh points",
"update points"
],
channel_whitelist=["#command-center"])
block_action = """
[
{
"type": "actions",
"block_id": "test_block_id",
"elements": [
{
"type": "button",
"action_id": "test_action_id",
"text": {
"type": "plain_text",
"text": "Send payload",
"emoji": false
}
}
]
}
]
"""
import logging
from dataclasses import dataclass
from typing import List, Match, Callable, TypeVar, Optional, Iterable, Any, Coroutine
from fuzzywuzzy import fuzz
import hooks
from plugins import identifier, house_management, scroll_util
import client
import slack_util
SHEET_ID = "1f9p4H7TWPm8rAM4v_qr2Vc6lBiFNEmR-quTY9UtxEBI"
MIN_RATIO = 80.0
async def alert_user(brother: scroll_util.Brother, saywhat: str) -> None:
"""
DM a brother saying something. Wrapper around several simpler methods
"""
# We do this as a for loop just in case multiple people reg. to same scroll for some reason (e.g. dup accounts)
succ = False
for slack_id in await identifier.lookup_brother_userids(brother):
client.get_slack().send_message(saywhat, slack_id)
succ = True
# Warn if we never find
if not succ:
logging.warning("Unable to find dm conversation for brother {}".format(brother))
# Generic type
T = TypeVar("T")
def tiemax(items: Iterable[T], key: Callable[[T], Optional[float]]) -> List[T]:
best = []
best_score = None
for elt in items:
# Compute the score
score = key(elt)
# Ignore blank scores
if score is None:
continue
# Check if its the new best, wiping old if so
elif best_score is None or score > best_score:
best_score = score
best = [elt]
# Check if its same as last best
elif score == best_score:
best.append(elt)
return best
@dataclass
class _ModJobContext:
signer: scroll_util.Brother # The brother invoking the command
assign: house_management.JobAssignment # The job assignment to modify
async def _mod_jobs(event: slack_util.Event,
relevance_scorer: Callable[[house_management.JobAssignment], Optional[float]],
modifier: Callable[[_ModJobContext], Coroutine[Any, Any, None]],
no_job_msg: str = None
) -> None:
"""
Stub function that handles various tasks relating to modifying jobs
:param relevance_scorer: Function scores job assignments on relevance. Determines which gets modified
:param modifier: Callback function to modify a job. Only called on a successful operation, and only on one job
"""
# Make an error wrapper
verb = slack_util.VerboseWrapper(event)
# Who invoked this command?
signer = await verb(event.user.as_user().get_brother())
# Get all of the assignments
assigns = await verb(house_management.import_assignments())
# Find closest assignment to what we're after. This just wraps relevance_scorer to handle nones.
def none_scorer(a: Optional[house_management.JobAssignment]) -> Optional[float]:
if a is None:
return None
else:
return relevance_scorer(a)
closest_assigns = tiemax(assigns, key=none_scorer)
# This is what we do on success. It will or won't be called immediately based on what's in closest_assigns
async def success_callback(targ_assign: house_management.JobAssignment) -> None:
# First get the most up to date version of the jobs
fresh_assigns = await verb(house_management.import_assignments())
# Find the one that matches what we had before
fresh_targ_assign = fresh_assigns[fresh_assigns.index(targ_assign)]
# Create the context
context = _ModJobContext(signer, fresh_targ_assign)
# Modify it
await modifier(context)
# Re-upload
await house_management.export_assignments(fresh_assigns)
# Also import and update points
headers, points = await house_management.import_points()
house_management.apply_house_points(points, fresh_assigns)
house_management.export_points(headers, points)
# If there aren't any jobs, say so
if len(closest_assigns) == 0:
if no_job_msg is None:
no_job_msg = "Unable to find any jobs to apply this command to. Try again with better spelling or whatever."
client.get_slack().reply(event, no_job_msg)
# If theres only one job, sign it off
elif len(closest_assigns) == 1:
await success_callback(closest_assigns[0])
# If theres multiple jobs, we need to get a follow up!
else:
# Say we need more info
job_list = "\n".join("{}: {}".format(i, a.job.pretty_fmt()) for i, a in enumerate(closest_assigns))
client.get_slack().reply(event, "Multiple relevant job listings found.\n"
"Please enter the number corresponding to the job "
"you wish to modify:\n{}".format(job_list))
# Establish a follow up command pattern
pattern = r"\d+"
# Make the follow up callback
async def foc(_event: slack_util.Event, _match: Match) -> None:
# Get the number out
index = int(_match.group(0))
# Check that its valid
if 0 <= index < len(closest_assigns):
# We now know what we're trying to sign off!
await success_callback(closest_assigns[index])
else:
# They gave a bad index, or we were unable to find the assignment again.
client.get_slack().reply(_event, "Invalid job index / job unable to be found.")
# Make a listener hook
new_hook = hooks.ReplyWaiter(foc, pattern, event.message.ts, 120)
# Register it
client.get_slack().add_hook(new_hook)
async def signoff_callback(event: slack_util.Event, match: Match) -> None:
verb = slack_util.VerboseWrapper(event)
print("Receving a signoff request!!!")
# Find out who we are trying to sign off is
signee_name = match.group(1)
signee = await verb(scroll_util.find_by_name(signee_name, MIN_RATIO))
# Score by name similarity, only accepting non-assigned jobs
def scorer(assign: house_management.JobAssignment):
if assign.assignee is not None:
r = fuzz.ratio(signee.name, assign.assignee.name)
if assign.signer is None and r > MIN_RATIO:
return r
# Set the assigner, and notify
async def modifier(context: _ModJobContext):
context.assign.signer = context.signer
# Say we did it wooo!
client.get_slack().reply(event, "Signed off {} for {}".format(context.assign.assignee.name,
context.assign.job.name))
await alert_user(context.assign.assignee, "{} signed you off for {}.".format(context.assign.signer.name,
context.assign.job.pretty_fmt()))
# Fire it off
await _mod_jobs(event, scorer, modifier)
async def undo_callback(event: slack_util.Event, match: Match) -> None:
verb = slack_util.VerboseWrapper(event)
print("Receving an unsignoff request!!!")
# Find out who we are trying to sign off is
signee_name = match.group(1)
signee = await verb(scroll_util.find_by_name(signee_name, MIN_RATIO))
# Score by name similarity, only accepting jobs that are signed off
def scorer(assign: house_management.JobAssignment):
if assign.assignee is not None:
r = fuzz.ratio(signee.name, assign.assignee.name)
if assign.signer is not None and r > MIN_RATIO:
return r
# Set the assigner to be None, and notify
async def modifier(context: _ModJobContext):
context.assign.signer = None
# Say we did it wooo!
client.get_slack().reply(event, "Undid signoff of {} for {}".format(context.assign.assignee.name,
context.assign.job.name))
await alert_user(context.assign.assignee, "{} undid your signoff off for {}.\n")
# Fire it off
await _mod_jobs(event, scorer, modifier)
async def late_callback(event: slack_util.Event, match: Match) -> None:
verb = slack_util.VerboseWrapper(event)
# Find out who we are trying to sign off is
signee_name = match.group(1)
signee = await verb(scroll_util.find_by_name(signee_name, MIN_RATIO))
# Score by name similarity. Don't care if signed off or not
def scorer(assign: house_management.JobAssignment):
if assign.assignee is not None:
r = fuzz.ratio(signee.name, assign.assignee.name)
if r > MIN_RATIO:
return r
# Just set the assigner
async def modifier(context: _ModJobContext):
context.assign.late = not context.assign.late
# Say we did it
client.get_slack().reply(event, "Toggled lateness of {}.\n"
"Now marked as late: {}".format(context.assign.job.pretty_fmt(),
context.assign.late))
# Fire it off
await _mod_jobs(event, scorer, modifier)
async def reassign_callback(event: slack_util.Event, match: Match) -> None:
verb = slack_util.VerboseWrapper(event)
# Find out our two targets
from_name = match.group(1).strip()
to_name = match.group(2).strip()
# Get them as brothers
from_bro = await verb(scroll_util.find_by_name(from_name, MIN_RATIO))
to_bro = await verb(scroll_util.find_by_name(to_name, MIN_RATIO))
# Score by name similarity to the first brother. Don't care if signed off or not,
# as we want to be able to transfer even after signoffs (why not, amirite?)
def scorer(assign: house_management.JobAssignment):
if assign.assignee is not None:
r = fuzz.ratio(from_bro.name, assign.assignee.name)
if r > MIN_RATIO:
return r
# Change the assignee
async def modifier(context: _ModJobContext):
context.assign.assignee = to_bro
# Say we did it
reassign_msg = "Job {} reassigned from {} to {}".format(context.assign.job.pretty_fmt(),
from_bro,
to_bro)
client.get_slack().reply(event, reassign_msg)
# Tell the people
reassign_msg = "Job {} reassigned from {} to {}".format(context.assign.job.pretty_fmt(),
from_bro,
to_bro)
await alert_user(from_bro, reassign_msg)
await alert_user(to_bro, reassign_msg)
# Fire it off
await _mod_jobs(event, scorer, modifier)
# noinspection PyUnusedLocal
async def reset_callback(event: slack_util.Event, match: Match) -> None:
"""
Resets the scores.
"""
# Unassign everything
assigns = await house_management.import_assignments()
for a in assigns:
if a is not None:
a.signer = None
await house_management.export_assignments(assigns)
# Now wipe points
headers, points = house_management.import_points()
# Set to 0/default
for i in range(len(points)):
new = house_management.PointStatus(brother=points[i].brother)
points[i] = new
house_management.apply_house_points(points, await house_management.import_assignments())
house_management.export_points(headers, points)
client.get_slack().reply(event, "Reset scores and signoffs")
# noinspection PyUnusedLocal
async def refresh_callback(event: slack_util.Event, match: Match) -> None:
print("Received request to reset points")
headers, points = await house_management.import_points()
print("Received request to reset points 1")
house_management.apply_house_points(points, await house_management.import_assignments())
print("Received request to reset points 2")
house_management.export_points(headers, points)
print("Received request to reset points 3")
client.get_slack().reply(event, "Force updated point values")
async def nag_callback(event: slack_util.Event, match: Match) -> None:
# Get the day
day = match.group(1).lower().strip()
if not await nag_jobs(day):
client.get_slack().reply(event,
"No jobs found. Check that the day is spelled correctly, with no extra symbols.\n"
"It is possible that all jobs have been signed off, as well.",
in_thread=True)
# Wrapper so we can auto-call this as well
async def nag_jobs(day_of_week: str) -> bool:
# Get the assigns
assigns = await house_management.import_assignments()
# Filter to day
assigns = [assign for assign in assigns if assign is not None and assign.job.day_of_week.lower() == day_of_week]
# Filter signed off
assigns = [assign for assign in assigns if assign.signer is None]
# If no jobs found, somethings up. Probably mispelled day. Return failure
if not assigns:
return False
# Nag each
response = "Do yer jerbs! They are as follows:\n"
for assign in assigns:
# Make the row template
if assign.assignee is None:
continue
response += "({}) {} -- {} ".format(assign.job.house, assign.job.name, assign.assignee.name)
# Find the people to @
brother_slack_ids = await identifier.lookup_brother_userids(assign.assignee)
if brother_slack_ids:
for slack_id in brother_slack_ids:
response += "<@{}> ".format(slack_id)
else:
response += "(scroll missing. Please register for @ pings!)"
response += "\n"
general_id = client.get_slack().get_conversation_by_name("#general").id
client.get_slack().send_message(response, general_id)
return True
signoff_hook = hooks.ChannelHook(signoff_callback,
patterns=[
r"signoff\s+(.*)",
r"sign off\s+(.*)",
],
channel_whitelist=["#housejobs"])
undo_hook = hooks.ChannelHook(undo_callback,
patterns=[
r"unsignoff\s+(.*)",
r"undosignoff\s+(.*)",
r"undo signoff\s+(.*)",
],
channel_whitelist=["#housejobs"])
late_hook = hooks.ChannelHook(late_callback,
patterns=[
r"marklate\s+(.*)",
r"mark late\s+(.*)",
],
channel_whitelist=["#housejobs"])
reset_hook = hooks.ChannelHook(reset_callback,
patterns=[
r"reset signoffs",
r"reset sign offs",
],
channel_whitelist=["#command-center"])
nag_hook = hooks.ChannelHook(nag_callback,
patterns=[
r"nagjobs\s+(.*)",
r"nag jobs\s+(.*)"
],
channel_whitelist=["#command-center"])
reassign_hook = hooks.ChannelHook(reassign_callback,
patterns=r"reassign\s+(.*?)-&gt;\s+(.+)",
channel_whitelist=["#housejobs"])
refresh_hook = hooks.ChannelHook(refresh_callback,
patterns=[
"refresh points",
"update points"
],
channel_whitelist=["#command-center"])
block_action = """
[
{
"type": "actions",
"block_id": "test_block_id",
"elements": [
{
"type": "button",
"action_id": "test_action_id",
"text": {
"type": "plain_text",
"text": "Send payload",
"emoji": false
}
}
]
}
]
"""

View File

@ -1,67 +1,67 @@
from typing import Match
import hooks
import client
import settings
import slack_util
# Gracefully reboot to reload code changes
# noinspection PyUnusedLocal
async def reboot_callback(event: slack_util.Event, match: Match) -> None:
response = "Ok. Rebooting..."
client.get_slack().reply(event, response)
exit(0)
async def post_log_callback(event: slack_util.Event, match: Match) -> None:
# Get the last n lines of log of the specified severity or higher
count = 100
lines = []
# numerically rank the debug severity
severity_codex = {
"CRITICAL": 50,
"ERROR": 40,
"WARNING": 30,
"INFO": 20,
"DEBUG": 10,
"NOTSET": 0
}
curr_rating = 0
# Get the min rating if one exists
min_rating = 0
rating_str = match.group(1).upper().strip()
for severity_name, severity_value in severity_codex.items():
if severity_name in rating_str:
min_rating = severity_value
break
with open(settings.LOGFILE, 'r') as f:
for line in f:
# Update the current rating if necessary
if line[:3] == "#!#":
for k, v in severity_codex.items():
if k in line:
curr_rating = v
break
# Add the line if its severity is at or above the required minimum
if curr_rating >= min_rating:
lines.append(line)
if len(lines) > count:
del lines[0]
# Spew them out
client.get_slack().reply(event, "```" + ''.join(lines) + "```")
# Make hooks
reboot_hook = hooks.ChannelHook(reboot_callback,
patterns=r"reboot",
channel_whitelist=["#command-center"])
log_hook = hooks.ChannelHook(post_log_callback,
patterns=["post logs(.*)", "logs(.*)", "post_logs(.*)"],
channel_whitelist=["#botzone"])
from typing import Match
import hooks
import client
import settings
import slack_util
# Gracefully reboot to reload code changes
# noinspection PyUnusedLocal
async def reboot_callback(event: slack_util.Event, match: Match) -> None:
response = "Ok. Rebooting..."
client.get_slack().reply(event, response)
exit(0)
async def post_log_callback(event: slack_util.Event, match: Match) -> None:
# Get the last n lines of log of the specified severity or higher
count = 100
lines = []
# numerically rank the debug severity
severity_codex = {
"CRITICAL": 50,
"ERROR": 40,
"WARNING": 30,
"INFO": 20,
"DEBUG": 10,
"NOTSET": 0
}
curr_rating = 0
# Get the min rating if one exists
min_rating = 0
rating_str = match.group(1).upper().strip()
for severity_name, severity_value in severity_codex.items():
if severity_name in rating_str:
min_rating = severity_value
break
with open(settings.LOGFILE, 'r') as f:
for line in f:
# Update the current rating if necessary
if line[:3] == "#!#":
for k, v in severity_codex.items():
if k in line:
curr_rating = v
break
# Add the line if its severity is at or above the required minimum
if curr_rating >= min_rating:
lines.append(line)
if len(lines) > count:
del lines[0]
# Spew them out
client.get_slack().reply(event, "```" + ''.join(lines) + "```")
# Make hooks
reboot_hook = hooks.ChannelHook(reboot_callback,
patterns=r"reboot",
channel_whitelist=["#command-center"])
log_hook = hooks.ChannelHook(post_log_callback,
patterns=["post logs(.*)", "logs(.*)", "post_logs(.*)"],
channel_whitelist=["#botzone"])

View File

@ -1,230 +1,232 @@
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Optional, List
import hooks
import slack_util
from plugins import identifier, job_commands, house_management
import client
def seconds_until(target: datetime) -> float:
curr = datetime.now()
# Compute seconds betwixt
delta = target - curr
ds = delta.seconds
# Lower bound to 0
if ds < 0:
return 0
else:
return delta.seconds
class ItsTenPM(hooks.Passive):
async def run(self) -> None:
while True:
# Get 10PM
ten_pm = datetime.now().replace(hour=22, minute=0, second=0)
# Find out how long until it, then sleep that long
delay = seconds_until(ten_pm)
await asyncio.sleep(delay)
# Crow like a rooster
client.get_slack().send_message("IT'S 10 PM!", client
.get_slack()
.get_conversation_by_name("#random").id)
# Wait a while before trying it again, to prevent duplicates
await asyncio.sleep(60)
# Shared behaviour
class JobNotifier:
@staticmethod
def get_day_of_week(time) -> str:
"""
Gets the current day of week as a str
"""
return ["Monday",
"Tuesday",
"Wednesday",
"Thursday",
"Friday",
"Saturday",
"Sunday"][time.weekday()]
@staticmethod
def is_job_valid(a: Optional[house_management.JobAssignment]):
# If it doesn't exist, it a thot
if a is None:
return False
# If its not today, we shouldn't nag
if a.job.day_of_week.lower() != JobNotifier.get_day_of_week(datetime.now()).lower():
return False
# If it is unassigned, we can't nag
if a.assignee is None:
return False
# If its been signed off, no need to nag
if a.signer is not None:
return False
# If the brother wasn't recognized, don't try nagging
if not a.assignee.is_valid():
return False
return True
class NotifyJobs(hooks.Passive, JobNotifier):
# Auto-does the nag jobs thing
async def run(self) -> None:
while True:
# Get the "Start" of the current day (Say, 10AM)
next_remind_time = datetime.now().replace(hour=10, minute=00, second=0)
# If we've accidentally made it in the past somehow, bump it up one date
while datetime.now() > next_remind_time:
next_remind_time += timedelta(days=1)
# Sleep until that time
delay = seconds_until(next_remind_time)
await asyncio.sleep(delay)
# Now it is that time. Nag the jobs
await job_commands.nag_jobs(self.get_day_of_week(next_remind_time))
# Sleep for a bit to prevent double shots
await asyncio.sleep(10)
class RemindJobs(hooks.Passive, JobNotifier):
async def run(self) -> None:
while True:
# Get the end of the current day (Say, 10PM)
next_remind_time = datetime.now().replace(hour=22, minute=00, second=0)
# If we've accidentally made it in the past somehow, bump it up one date
while datetime.now() > next_remind_time:
next_remind_time += timedelta(days=1)
# Sleep until that time
delay = seconds_until(next_remind_time)
await asyncio.sleep(delay)
# Now it is that time. Get the current jobs
assigns = await house_management.import_assignments()
# Filter to incomplete, and today
assigns: List[house_management.JobAssignment] = [a for a in assigns if self.is_job_valid(a)]
# Now, we want to nag each person. If we don't actually know who they are, so be it.
logging.info("Scheduled reminding people who haven't yet done their jobs.")
for a in assigns:
# Get the relevant slack ids
assignee_ids = await identifier.lookup_brother_userids(a.assignee)
# For each, send them a DM
success = False
for slack_id in assignee_ids:
msg = "{}, you still need to do {}".format(a.assignee.name, a.job.pretty_fmt())
success = True
client.get_slack().send_message(msg, slack_id)
# Warn on failure
if not success:
logging.warning("Tried to nag {} but couldn't find their slack id".format(a.assignee.name))
# Take a break to ensure no double-shots
await asyncio.sleep(10)
class Updatinator(hooks.Passive):
"""
Periodically updates the channels and users in the slack
"""
def __init__(self, wrapper_to_update: client.ClientWrapper, interval_seconds: int):
self.wrapper_target = wrapper_to_update
self.interval = interval_seconds
async def run(self):
# Give time to warmup
while True:
self.wrapper_target.update_channels()
self.wrapper_target.update_users()
await asyncio.sleep(self.interval)
class TestPassive(hooks.Passive):
"""
Stupid shit
"""
async def run(self) -> None:
lifespan = 60
post_interval = 60
def make_interactive_msg():
# Send the message and recover the ts
response = client.get_slack().send_message("Select an option:", "#botzone", blocks=[
{
"type": "actions",
"block_id": "button_test",
"elements": [
{
"type": "button",
"action_id": "alpha_button",
"text": {
"type": "plain_text",
"text": "Alpha",
"emoji": False
}
},
{
"type": "button",
"action_id": "beta_button",
"text": {
"type": "plain_text",
"text": "Beta",
"emoji": False
}
}
]
}
])
msg_ts = response["ts"]
botzone = client.get_slack().get_conversation_by_name("#botzone")
# Make our mappings
button_responses = {
"alpha_button": "You clicked alpha. Good work.",
"beta_button": "You clicked beta. You must be so proud."
}
# Make our callbacks
async def on_click(event: slack_util.Event, response_str: str):
# Edit the message to show the result.
client.get_slack().edit_message(response_str, event.conversation.conversation_id, event.message.ts, [])
def on_expire():
# Edit the message to show defeat.
client.get_slack().edit_message("Timed out", botzone.id, msg_ts, [])
# Add a listener
listener = hooks.InteractionListener(on_click,
button_responses,
botzone,
msg_ts,
lifespan,
on_expire)
client.get_slack().add_hook(listener)
# Iterate editing the message every n seconds, for quite some time
for i in range(10):
make_interactive_msg()
await asyncio.sleep(post_interval)
def __init__(self):
pass
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Optional, List
import hooks
import slack_util
from plugins import identifier, job_commands, house_management
import client
def seconds_until(target: datetime) -> float:
curr = datetime.now()
# Compute seconds betwixt
delta = target - curr
ds = delta.seconds
# Lower bound to 0
if ds < 0:
return 0
else:
return delta.seconds
class ItsTenPM(hooks.Passive):
async def run(self) -> None:
while True:
# Get 10PM
ten_pm = datetime.now().replace(hour=22, minute=0, second=0)
# Find out how long until it, then sleep that long
delay = seconds_until(ten_pm)
await asyncio.sleep(delay)
# Crow like a rooster
client.get_slack().send_message("IT'S 10 PM!", client
.get_slack()
.get_conversation_by_name("#random").id)
# Wait a while before trying it again, to prevent duplicates
await asyncio.sleep(60)
# Shared behaviour
class JobNotifier:
@staticmethod
def get_day_of_week(time) -> str:
"""
Gets the current day of week as a str
"""
return ["Monday",
"Tuesday",
"Wednesday",
"Thursday",
"Friday",
"Saturday",
"Sunday"][time.weekday()]
@staticmethod
def is_job_valid(a: Optional[house_management.JobAssignment]):
# If it doesn't exist, it a thot
if a is None:
return False
# If its not today, we shouldn't nag
if a.job.day_of_week.lower() != JobNotifier.get_day_of_week(datetime.now()).lower():
return False
# If it is unassigned, we can't nag
if a.assignee is None:
return False
# If its been signed off, no need to nag
if a.signer is not None:
return False
# If the brother wasn't recognized, don't try nagging
if not a.assignee.is_valid():
return False
return True
class NotifyJobs(hooks.Passive, JobNotifier):
# Auto-does the nag jobs thing
async def run(self) -> None:
while True:
# Get the "Start" of the current day (Say, 10AM)
next_remind_time = datetime.now().replace(hour=10, minute=00, second=0)
# If we've accidentally made it in the past somehow, bump it up one date
while datetime.now() > next_remind_time:
next_remind_time += timedelta(days=1)
# Sleep until that time
delay = seconds_until(next_remind_time)
await asyncio.sleep(delay)
# Now it is that time. Nag the jobs
await job_commands.nag_jobs(self.get_day_of_week(next_remind_time))
# Sleep for a bit to prevent double shots
await asyncio.sleep(10)
class RemindJobs(hooks.Passive, JobNotifier):
async def run(self) -> None:
while True:
# Get the end of the current day (Say, 10PM)
next_remind_time = datetime.now().replace(hour=22, minute=00, second=0)
# If we've accidentally made it in the past somehow, bump it up one date
while datetime.now() > next_remind_time:
next_remind_time += timedelta(days=1)
# Sleep until that time
delay = seconds_until(next_remind_time)
await asyncio.sleep(delay)
# Now it is that time. Get the current jobs
assigns = await house_management.import_assignments()
# Filter to incomplete, and today
assigns: List[house_management.JobAssignment] = [a for a in assigns if self.is_job_valid(a)]
# Now, we want to nag each person. If we don't actually know who they are, so be it.
logging.info("Scheduled reminding people who haven't yet done their jobs.")
for a in assigns:
# Get the relevant slack ids
assignee_ids = await identifier.lookup_brother_userids(a.assignee)
# For each, send them a DM
success = False
for slack_id in assignee_ids:
msg = "{}, you still need to do {}".format(a.assignee.name, a.job.pretty_fmt())
success = True
client.get_slack().send_message(msg, slack_id)
# Warn on failure
if not success:
logging.warning("Tried to nag {} but couldn't find their slack id".format(a.assignee.name))
# Take a break to ensure no double-shots
await asyncio.sleep(10)
class Updatinator(hooks.Passive):
"""
Periodically updates the channels and users in the slack
"""
def __init__(self, wrapper_to_update: client.ClientWrapper, interval_seconds: int):
self.wrapper_target = wrapper_to_update
self.interval = interval_seconds
async def run(self):
# Give time to warmup
while True:
self.wrapper_target.update_channels()
self.wrapper_target.update_users()
await asyncio.sleep(self.interval)
class TestPassive(hooks.Passive):
"""
Stupid shit
"""
async def run(self) -> None:
lifespan = 60
post_interval = 60
def make_interactive_msg():
# Send the message and recover the ts
response = client.get_slack().send_message("Select an option:", "#botzone", blocks=[
{
"type": "actions",
"block_id": "button_test",
"elements": [
{
"type": "button",
"action_id": "alpha_button",
"text": {
"type": "plain_text",
"text": "Alpha",
"emoji": False
}
},
{
"type": "button",
"action_id": "beta_button",
"text": {
"type": "plain_text",
"text": "Beta",
"emoji": False
}
}
]
}
])
msg_ts = response["ts"]
botzone = client.get_slack().get_conversation_by_name("#botzone")
# Make our mappings
button_responses = {
"alpha_button": "You clicked alpha. Good work.",
"beta_button": "You clicked beta. You must be so proud."
}
# Make our callbacks
async def on_click(event: slack_util.Event, response_str: str):
# Edit the message to show the result.
client.get_slack().edit_message(response_str, event.conversation.conversation_id, event.message.ts, [])
def on_expire():
# Edit the message to show defeat.
client.get_slack().edit_message("Timed out", botzone.id, msg_ts, [])
# Add a listener
listener = hooks.InteractionListener(on_click,
button_responses,
botzone,
msg_ts,
lifespan,
on_expire)
client.get_slack().add_hook(listener)
# Iterate editing the message every n seconds, for quite some time
for i in range(10):
make_interactive_msg()
await asyncio.sleep(post_interval)
def __init__(self):
pass

View File

@ -1,111 +1,111 @@
from __future__ import annotations
"""
This file contains util for scroll polling
Only really kept separate for neatness sake.
"""
import re
from dataclasses import dataclass
from typing import List, Optional, Match
from fuzzywuzzy import process
import hooks
import client
import slack_util
# Use this if we can't figure out who a brother actually is
MISSINGBRO_SCROLL = -1
@dataclass
class Brother(object):
"""
Represents a brother.
"""
name: str
scroll: int
def is_valid(self):
return self.scroll is not MISSINGBRO_SCROLL
# load the family tree
familyfile = open("sortedfamilytree.txt", 'r')
# Parse out
brother_match = re.compile(r"([0-9]*)~(.*)")
brothers_matches = [brother_match.match(line) for line in familyfile]
brothers_matches = [m for m in brothers_matches if m]
brothers: List[Brother] = [Brother(m.group(2), int(m.group(1))) for m in brothers_matches]
async def scroll_callback(event: slack_util.Event, match: Match) -> None:
"""
Finds the scroll of a brother, or the brother of a scroll, based on msg text.
"""
# Get the query
query = match.group(1).strip()
# Try to get as int or by name
try:
sn = int(query)
result = find_by_scroll(sn)
except ValueError:
result = await find_by_name(query)
if result:
result = "Brother {} has scroll {}".format(result.name, result.scroll)
else:
result = "Couldn't find brother {}".format(query)
# Respond
client.get_slack().reply(event, result)
def find_by_scroll(scroll: int) -> Optional[Brother]:
"""
Lookups a brother in the family list, using their scroll.
:param scroll: The integer scroll to look up
:return: The brother, or None
"""
for b in brothers:
if b.scroll == scroll:
return b
return None
# Used to track a sufficiently shitty typed name
class BrotherNotFound(Exception):
"""Throw when we can't find the desired brother."""
pass
async def find_by_name(name: str, threshold: Optional[float] = None) -> Brother:
"""
Looks up a brother by name. Raises exception if threshold provided and not met.
:param threshold: Minimum match ratio to accept. Can be none.
:param name: The name to look up, with a fuzzy search
:raises BrotherNotFound:
:return: The best-match brother
""" # Get all of the names
all_names = [b.name for b in brothers]
# Do fuzzy match
found, score = process.extractOne(name, all_names)
found_index = all_names.index(found)
found_brother = brothers[found_index]
if (not threshold) or score > threshold:
return found_brother
else:
msg = "Couldn't find brother {}. Best match \"{}\" matched with accuracy {}, falling short of safety minimum " \
"accuracy {}. Please type name more accurately, to prevent misfires.".format(name,
found_brother,
score,
threshold)
raise BrotherNotFound(msg)
scroll_hook = hooks.ChannelHook(scroll_callback, patterns=r"scroll\s+(.*)")
from __future__ import annotations
"""
This file contains util for scroll polling
Only really kept separate for neatness sake.
"""
import re
from dataclasses import dataclass
from typing import List, Optional, Match
from fuzzywuzzy import process
import hooks
import client
import slack_util
# Use this if we can't figure out who a brother actually is
MISSINGBRO_SCROLL = -1
@dataclass
class Brother(object):
"""
Represents a brother.
"""
name: str
scroll: int
def is_valid(self):
return self.scroll is not MISSINGBRO_SCROLL
# load the family tree
familyfile = open("sortedfamilytree.txt", 'r')
# Parse out
brother_match = re.compile(r"([0-9]*)~(.*)")
brothers_matches = [brother_match.match(line) for line in familyfile]
brothers_matches = [m for m in brothers_matches if m]
brothers: List[Brother] = [Brother(m.group(2), int(m.group(1))) for m in brothers_matches]
async def scroll_callback(event: slack_util.Event, match: Match) -> None:
"""
Finds the scroll of a brother, or the brother of a scroll, based on msg text.
"""
# Get the query
query = match.group(1).strip()
# Try to get as int or by name
try:
sn = int(query)
result = find_by_scroll(sn)
except ValueError:
result = await find_by_name(query)
if result:
result = "Brother {} has scroll {}".format(result.name, result.scroll)
else:
result = "Couldn't find brother {}".format(query)
# Respond
client.get_slack().reply(event, result)
def find_by_scroll(scroll: int) -> Optional[Brother]:
"""
Lookups a brother in the family list, using their scroll.
:param scroll: The integer scroll to look up
:return: The brother, or None
"""
for b in brothers:
if b.scroll == scroll:
return b
return None
# Used to track a sufficiently shitty typed name
class BrotherNotFound(Exception):
"""Throw when we can't find the desired brother."""
pass
async def find_by_name(name: str, threshold: Optional[float] = None) -> Brother:
"""
Looks up a brother by name. Raises exception if threshold provided and not met.
:param threshold: Minimum match ratio to accept. Can be none.
:param name: The name to look up, with a fuzzy search
:raises BrotherNotFound:
:return: The best-match brother
""" # Get all of the names
all_names = [b.name for b in brothers]
# Do fuzzy match
found, score = process.extractOne(name, all_names)
found_index = all_names.index(found)
found_brother = brothers[found_index]
if (not threshold) or score > threshold:
return found_brother
else:
msg = "Couldn't find brother {}. Best match \"{}\" matched with accuracy {}, falling short of safety minimum " \
"accuracy {}. Please type name more accurately, to prevent misfires.".format(name,
found_brother,
score,
threshold)
raise BrotherNotFound(msg)
scroll_hook = hooks.ChannelHook(scroll_callback, patterns=r"scroll\s+(.*)")

View File

@ -1,101 +1,101 @@
import re
import textwrap
from typing import Match
import hooks
from plugins import house_management
import client
import slack_util
from plugins.scroll_util import Brother
counted_data = ["flaked", "rolled", "replaced", "washed", "dried"]
lookup_format = "{}\s+(\d+)"
def fmt_work_dict(work_dict: dict) -> str:
return ",\n".join(["{} × {}".format(job, count) for job, count in sorted(work_dict.items())])
# noinspection PyUnusedLocal
async def count_work_callback(event: slack_util.Event, match: Match) -> None:
# If no user, continue
if event.user is None:
return
# If bot, continue
if event.bot is not None:
return
# Make an error wrapper
verb = slack_util.VerboseWrapper(event)
# Tidy the text
text = event.message.text.strip().lower()
# Couple things to work through.
# One: Who sent the message?
who_wrote = await verb(event.user.as_user().get_brother())
who_wrote_label = "{} [{}]".format(who_wrote.name, who_wrote.scroll)
# Two: What work did they do?
new_work = {}
for job in counted_data:
pattern = lookup_format.format(job)
match = re.search(pattern, text)
if match:
new_work[job] = int(match.group(1))
# Three: check if we found anything
if len(new_work) == 0:
if re.search(r'\s\d\s', text) is not None:
client.get_slack().reply(event,
"If you were trying to record work, it was not recognized.\n"
"Use words {} or work will not be recorded".format(counted_data))
return
# Four: Knowing they did something, record to total work
contribution_count = sum(new_work.values())
new_total = await verb(record_towel_contribution(who_wrote, contribution_count))
# Five, congratulate them on their work!
congrats = textwrap.dedent("""{} recorded work:
{}
Net increase in points: {}
Total points since last reset: {}""".format(who_wrote_label,
fmt_work_dict(new_work),
contribution_count,
new_total))
client.get_slack().reply(event, congrats)
async def record_towel_contribution(for_brother: Brother, contribution_count: int) -> int:
"""
Grants <count> contribution point to the specified user.
Returns the new total.
"""
# Import house points
headers, points = await house_management.import_points()
# Find the brother
for p in points:
if p is None or p.brother != for_brother:
continue
# If found, mog with more points
p.towel_contribution_count += contribution_count
# Export
house_management.export_points(headers, points)
# Return the new total
return p.towel_contribution_count
# If not found, get mad!
raise KeyError("No score entry found for brother {}".format(for_brother))
# Make dem HOOKs
count_work_hook = hooks.ChannelHook(count_work_callback,
patterns=".*",
channel_whitelist=["#slavestothemachine"],
consumer=False)
import re
import textwrap
from typing import Match
import hooks
from plugins import house_management
import client
import slack_util
from plugins.scroll_util import Brother
counted_data = ["flaked", "rolled", "replaced", "washed", "dried"]
lookup_format = "{}\s+(\d+)"
def fmt_work_dict(work_dict: dict) -> str:
return ",\n".join(["{} × {}".format(job, count) for job, count in sorted(work_dict.items())])
# noinspection PyUnusedLocal
async def count_work_callback(event: slack_util.Event, match: Match) -> None:
# If no user, continue
if event.user is None:
return
# If bot, continue
if event.bot is not None:
return
# Make an error wrapper
verb = slack_util.VerboseWrapper(event)
# Tidy the text
text = event.message.text.strip().lower()
# Couple things to work through.
# One: Who sent the message?
who_wrote = await verb(event.user.as_user().get_brother())
who_wrote_label = "{} [{}]".format(who_wrote.name, who_wrote.scroll)
# Two: What work did they do?
new_work = {}
for job in counted_data:
pattern = lookup_format.format(job)
match = re.search(pattern, text)
if match:
new_work[job] = int(match.group(1))
# Three: check if we found anything
if len(new_work) == 0:
if re.search(r'\s\d\s', text) is not None:
client.get_slack().reply(event,
"If you were trying to record work, it was not recognized.\n"
"Use words {} or work will not be recorded".format(counted_data))
return
# Four: Knowing they did something, record to total work
contribution_count = sum(new_work.values())
new_total = await verb(record_towel_contribution(who_wrote, contribution_count))
# Five, congratulate them on their work!
congrats = textwrap.dedent("""{} recorded work:
{}
Net increase in points: {}
Total points since last reset: {}""".format(who_wrote_label,
fmt_work_dict(new_work),
contribution_count,
new_total))
client.get_slack().reply(event, congrats)
async def record_towel_contribution(for_brother: Brother, contribution_count: int) -> int:
"""
Grants <count> contribution point to the specified user.
Returns the new total.
"""
# Import house points
headers, points = await house_management.import_points()
# Find the brother
for p in points:
if p is None or p.brother != for_brother:
continue
# If found, mog with more points
p.towel_contribution_count += contribution_count
# Export
house_management.export_points(headers, points)
# Return the new total
return p.towel_contribution_count
# If not found, get mad!
raise KeyError("No score entry found for brother {}".format(for_brother))
# Make dem HOOKs
count_work_hook = hooks.ChannelHook(count_work_callback,
patterns=".*",
channel_whitelist=["#slavestothemachine"],
consumer=False)

View File

@ -1,14 +1,14 @@
#!/bin/bash
#This is just a basic loop to run in the background on my raspi.
#Restarts the bot if it dies, and notifies me
while :
do
git pull
echo "Press [CTRL+C] to stop..."
sleep 1
python3 -u main.py
sleep 1
echo "Died. Updating and restarting..."
done
#!/bin/bash
#This is just a basic loop to run in the background on my raspi.
#Restarts the bot if it dies, and notifies me
while :
do
git pull
echo "Press [CTRL+C] to stop..."
sleep 1
python3 -u main.py
sleep 1
echo "Died. Updating and restarting..."
done

View File

@ -1,13 +1,13 @@
# Use if we want to await tasks one by one
# Hint: We usually don't
SINGLE_THREAD_TASKS = False
# If we were interested in performance, this should probably be turned off. However, it's fairly harmless and
# for the most part pretty beneficial to us.
# See https://docs.python.org/3/library/asyncio-dev.html#asyncio-debug-mode
# Note that this occasionally will give us warnings if SINGLE_THREAD_TAKS is False (which it usually is)
# howver, these warnings are harmless, as regardless of if aa task is awaited it still does its job
USE_ASYNC_DEBUG_MODE = False
# Use if we want to await tasks one by one
# Hint: We usually don't
SINGLE_THREAD_TASKS = False
# If we were interested in performance, this should probably be turned off. However, it's fairly harmless and
# for the most part pretty beneficial to us.
# See https://docs.python.org/3/library/asyncio-dev.html#asyncio-debug-mode
# Note that this occasionally will give us warnings if SINGLE_THREAD_TAKS is False (which it usually is)
# howver, these warnings are harmless, as regardless of if aa task is awaited it still does its job
USE_ASYNC_DEBUG_MODE = False
LOGFILE = "run.log"

View File

@ -1,221 +1,221 @@
from __future__ import annotations
import logging
from dataclasses import dataclass
from pprint import pformat
from time import sleep
from typing import Optional, Generator, Callable, Union, Awaitable
from typing import TypeVar
from slackclient import SlackClient
from slackclient.client import SlackNotConnected
import client
import plugins
"""
Objects to represent things within a slack workspace
"""
# noinspection PyUnresolvedReferences
@dataclass
class User:
id: str
name: str
real_name: Optional[str]
email: Optional[str]
async def get_brother(self) -> Optional[plugins.scroll_util.Brother]:
"""
Try to find the brother corresponding to this user.
"""
return await plugins.identifier.lookup_slackid_brother(self.id)
@dataclass
class Channel:
id: str
name: str
@dataclass
class DirectMessage:
id: str
user_id: str
def get_user(self) -> Optional[User]:
"""
Lookup the user to which this DM corresponds.
"""
return client.get_slack().get_user(self.user_id)
Conversation = Union[Channel, DirectMessage]
"""
Objects to represent attributes an event may contain
"""
@dataclass
class Event:
# Info on if this event ocurred in a particular conversation/channel, and if so which one
conversation: Optional[ConversationContext] = None
# Info on if a particular user caused this event, and if so which one
user: Optional[UserContext] = None
# Info on if this event was a someone posting a message. For contents see related
was_post: Optional[PostMessageContext] = None
# The content of the most relevant message to this event
message: Optional[RelatedMessageContext] = None
# Info if this event was threaded on a parent message, and if so what that message was
thread: Optional[ThreadContext] = None
# Info if this event was an interaction, and if so with what
interaction: Optional[InteractionContext] = None
# Info about regarding if bot caused this event, and if so which one
bot: Optional[BotContext] = None
# If this was posted in a specific channel or conversation
@dataclass
class ConversationContext:
conversation_id: str
def get_conversation(self) -> Optional[Conversation]:
return client.get_slack().get_conversation(self.conversation_id)
# If there is a specific user associated with this event
@dataclass
class UserContext:
user_id: str
def as_user(self) -> Optional[User]:
return client.get_slack().get_user(self.user_id)
# Same but for bots
@dataclass
class BotContext:
bot_id: str
# Whether this was a newly posted message
@dataclass
class PostMessageContext:
pass
# Whether this event was related to a particular message, but not specifically posting it.
# To see if they posted it, check for PostMessageContext
@dataclass
class RelatedMessageContext:
ts: str
text: str
# Whether or not this is a threadable text message
@dataclass
class ThreadContext:
thread_ts: str
@dataclass
class InteractionContext:
response_url: str # Used to confirm/respond to requests
trigger_id: str # Used to open popups
block_id: str # Identifies the block of the interacted component
action_id: str # Identifies the interacted component
action_value: Optional[str] # Identifies the selected value in the component. None for buttons
# If a file was additionally shared
@dataclass
class File:
pass
"""
Objects for interfacing easily with rtm steams, and handling async events
"""
def message_stream(slack: SlackClient) -> Generator[Event, None, None]:
"""
Generator that yields messages from slack.
Messages are in standard api format, look it up.
Checks on 2 second intervals (may be changed)
"""
# Do forever
while True:
try:
if slack.rtm_connect(with_team_state=False, auto_reconnect=True):
logging.info("Waiting for messages")
while True:
sleep(0.1)
update_list = slack.rtm_read()
# Handle each
for update in update_list:
logging.info("RTM Message received")
logging.debug(pformat(update))
yield message_dict_to_event(update)
except (SlackNotConnected, OSError) as e:
logging.exception("Error while reading messages.")
except (ValueError, TypeError) as e:
logging.exception("Malformed message... Restarting connection")
sleep(5)
logging.warning("Connection failed - retrying")
def message_dict_to_event(update: dict) -> Event:
"""
Converts a dict update to an actual event.
"""
event = Event()
# Big logic folks
if update["type"] == "message":
# For now we only handle these basic types of messages involving text
# TODO: Handle "unwrappeable" messages
if "text" in update and "ts" in update:
event.message = RelatedMessageContext(update["ts"], update["text"])
event.was_post = PostMessageContext()
if "channel" in update:
event.conversation = ConversationContext(update["channel"])
if "user" in update:
event.user = UserContext(update["user"])
if "bot_id" in update:
event.bot = BotContext(update["bot_id"])
if "thread_ts" in update:
event.thread = ThreadContext(update["thread_ts"])
# TODO: Handle more types of events, including http data etc.
return event
"""
Methods for easily responding to messages, etc.
"""
T = TypeVar("T")
class VerboseWrapper(Callable):
"""
Generates exception-ready delegates.
Warns of exceptions as they are passed through it, via responding to the given message.
"""
def __init__(self, event: Event):
self.event = event
async def __call__(self, awt: Awaitable[T]) -> T:
try:
return await awt
except Exception as e:
client.get_slack().reply(self.event, "Error: {}".format(str(e)), True)
raise e
from __future__ import annotations
import logging
from dataclasses import dataclass
from pprint import pformat
from time import sleep
from typing import Optional, Generator, Callable, Union, Awaitable
from typing import TypeVar
from slackclient import SlackClient
from slackclient.client import SlackNotConnected
import client
import plugins
"""
Objects to represent things within a slack workspace
"""
# noinspection PyUnresolvedReferences
@dataclass
class User:
id: str
name: str
real_name: Optional[str]
email: Optional[str]
async def get_brother(self) -> Optional[plugins.scroll_util.Brother]:
"""
Try to find the brother corresponding to this user.
"""
return await plugins.identifier.lookup_slackid_brother(self.id)
@dataclass
class Channel:
id: str
name: str
@dataclass
class DirectMessage:
id: str
user_id: str
def get_user(self) -> Optional[User]:
"""
Lookup the user to which this DM corresponds.
"""
return client.get_slack().get_user(self.user_id)
Conversation = Union[Channel, DirectMessage]
"""
Objects to represent attributes an event may contain
"""
@dataclass
class Event:
# Info on if this event ocurred in a particular conversation/channel, and if so which one
conversation: Optional[ConversationContext] = None
# Info on if a particular user caused this event, and if so which one
user: Optional[UserContext] = None
# Info on if this event was a someone posting a message. For contents see related
was_post: Optional[PostMessageContext] = None
# The content of the most relevant message to this event
message: Optional[RelatedMessageContext] = None
# Info if this event was threaded on a parent message, and if so what that message was
thread: Optional[ThreadContext] = None
# Info if this event was an interaction, and if so with what
interaction: Optional[InteractionContext] = None
# Info about regarding if bot caused this event, and if so which one
bot: Optional[BotContext] = None
# If this was posted in a specific channel or conversation
@dataclass
class ConversationContext:
conversation_id: str
def get_conversation(self) -> Optional[Conversation]:
return client.get_slack().get_conversation(self.conversation_id)
# If there is a specific user associated with this event
@dataclass
class UserContext:
user_id: str
def as_user(self) -> Optional[User]:
return client.get_slack().get_user(self.user_id)
# Same but for bots
@dataclass
class BotContext:
bot_id: str
# Whether this was a newly posted message
@dataclass
class PostMessageContext:
pass
# Whether this event was related to a particular message, but not specifically posting it.
# To see if they posted it, check for PostMessageContext
@dataclass
class RelatedMessageContext:
ts: str
text: str
# Whether or not this is a threadable text message
@dataclass
class ThreadContext:
thread_ts: str
@dataclass
class InteractionContext:
response_url: str # Used to confirm/respond to requests
trigger_id: str # Used to open popups
block_id: str # Identifies the block of the interacted component
action_id: str # Identifies the interacted component
action_value: Optional[str] # Identifies the selected value in the component. None for buttons
# If a file was additionally shared
@dataclass
class File:
pass
"""
Objects for interfacing easily with rtm steams, and handling async events
"""
def message_stream(slack: SlackClient) -> Generator[Event, None, None]:
"""
Generator that yields messages from slack.
Messages are in standard api format, look it up.
Checks on 2 second intervals (may be changed)
"""
# Do forever
while True:
try:
if slack.rtm_connect(with_team_state=False, auto_reconnect=True):
logging.info("Waiting for messages")
while True:
sleep(0.1)
update_list = slack.rtm_read()
# Handle each
for update in update_list:
logging.info("RTM Message received")
logging.debug(pformat(update))
yield message_dict_to_event(update)
except (SlackNotConnected, OSError) as e:
logging.exception("Error while reading messages.")
except (ValueError, TypeError) as e:
logging.exception("Malformed message... Restarting connection")
sleep(5)
logging.warning("Connection failed - retrying")
def message_dict_to_event(update: dict) -> Event:
"""
Converts a dict update to an actual event.
"""
event = Event()
# Big logic folks
if update["type"] == "message":
# For now we only handle these basic types of messages involving text
# TODO: Handle "unwrappeable" messages
if "text" in update and "ts" in update:
event.message = RelatedMessageContext(update["ts"], update["text"])
event.was_post = PostMessageContext()
if "channel" in update:
event.conversation = ConversationContext(update["channel"])
if "user" in update:
event.user = UserContext(update["user"])
if "bot_id" in update:
event.bot = BotContext(update["bot_id"])
if "thread_ts" in update:
event.thread = ThreadContext(update["thread_ts"])
# TODO: Handle more types of events, including http data etc.
return event
"""
Methods for easily responding to messages, etc.
"""
T = TypeVar("T")
class VerboseWrapper(Callable):
"""
Generates exception-ready delegates.
Warns of exceptions as they are passed through it, via responding to the given message.
"""
def __init__(self, event: Event):
self.event = event
async def __call__(self, awt: Awaitable[T]) -> T:
try:
return await awt
except Exception as e:
client.get_slack().reply(self.event, "Error: {}".format(str(e)), True)
raise e

File diff suppressed because it is too large Load Diff