hikari.impl.bot#
Basic implementation the components for a single-process bot.
Module Contents#
- class hikari.impl.bot.GatewayBot(token, *, allow_color=True, banner='hikari', executor=None, force_color=False, cache_settings=None, http_settings=None, intents=intents_.Intents.ALL_UNPRIVILEGED, auto_chunk_members=True, logs='INFO', max_rate_limit=300, max_retries=3, proxy_settings=None, rest_url=None)[source]#
Bases:
hikari.traits.GatewayBotAware
Basic auto-sharding bot implementation.
This is the class you will want to use to start, control, and build a bot with.
Note
Settings that control the gateway session are provided to the
GatewayBot.run
andGatewayBot.start
functions in this class. This is done to allow you to contextually customise details such as sharding configuration without having to re-initialize the entire application each time.- Parameters
- token
str
The bot token to sign in with.
- token
- Other Parameters
- allow_colorbool
Defaulting to
True
, this will enable coloured console logs on any platform that is a TTY. Setting a"CLICOLOR"
environment variable to any non `0` value will override this setting.Users should consider this an advice to the application on whether it is safe to show colours if possible or not. Since some terminals can be awkward or not support features in a standard way, the option to explicitly disable this is provided. See
force_color
for an alternative.- banner
typing.Optional
[str
] The package to search for a
banner.txt
in. Defaults to"hikari"
for the"hikari/banner.txt"
banner. Setting this toNone
will disable the banner being shown.- executor
typing.Optional
[concurrent.futures.Executor
] Defaults to
None
. If non-None
, then this executor is used instead of theconcurrent.futures.ThreadPoolExecutor
attached to theasyncio.AbstractEventLoop
that the bot will run on. This executor is used primarily for file-IO.While mainly supporting the
concurrent.futures.ThreadPoolExecutor
implementation in the standard lib, Hikari’s file handling systems should also work withconcurrent.futures.ProcessPoolExecutor
, which relies on all objects used in IPC to be pickleable. Many third-party libraries will not support this fully though, so your mileage may vary on using ProcessPoolExecutor implementations with this parameter.- force_colorbool
Defaults to
False
. IfTrue
, then this application will __force__ colour to be used in console-based output. Specifying a"CLICOLOR_FORCE"
environment variable with a non-"0"
value will override this setting.This will take precedence over
allow_color
if both are specified.- cache_settings
typing.Optional
[hikari.impl.config.CacheSettings
] Optional cache settings. If unspecified, will use the defaults.
- http_settings
typing.Optional
[hikari.impl.config.HTTPSettings
] Optional custom HTTP configuration settings to use. Allows you to customise functionality such as whether SSL-verification is enabled, what timeouts
aiohttp
should expect to use for requests, and behavior regarding HTTP-redirects.- intents
hikari.intents.Intents
Defaults to
hikari.intents.Intents.ALL_UNPRIVILEGED
. This allows you to change which intents your application will use on the gateway. This can be used to control and change the types of events you will receive.- auto_chunk_membersbool
Defaults to
True
. IfFalse
, then no member chunks will be requested automatically, even if there are reasons to do so.All following statements must be true to automatically request chunks:
auto_chunk_members
isTrue
.The members intent is enabled.
The server is marked as “large” or the presences intent is not enabled (since Discord only sends other members when presences are declared, we should also chunk small guilds if the presences are not declared).
The members cache is enabled or there are listeners for the
MemberChunkEvent
.
- logs
typing.Union
[None
,LoggerLevel
,typing.Dict
[str
,typing.Any
]] Defaults to
"INFO"
.If
None
, then the Python logging system is left uninitialized on startup, and you will need to configure it manually to view most logs that are output by components of this library.If one of the valid values in a
LoggerLevel
, then this will match a call tocolorlog.basicConfig
(a facade forlogging.basicConfig
with additional conduit for enabling coloured logging levels) with thelevel
kwarg matching this value.If a
typing.Dict[str, typing.Any]
equivalent, then this value is passed tologging.config.dictConfig
to allow the user to provide a specialized logging configuration of their choice. If any handlers are defined in the dict, default handlers will not be setup.As a side note, you can always opt to leave this on the default value and then use an incremental
logging.config.dictConfig
that applies any additional changes on top of the base configuration, if you prefer. An example of can be found in theExample
section.Note that
"TRACE_HIKARI"
is a library-specific logging level which is expected to be more verbose than"DEBUG"
.- max_rate_limit
float
The max number of seconds to backoff for when rate limited. Anything greater than this will instead raise an error.
This defaults to five minutes if left to the default value. This is to stop potentially indefinitely waiting on an endpoint, which is almost never what you want to do if giving a response to a user.
You can set this to
float("inf")
to disable this check entirely.Note that this only applies to the REST API component that communicates with Discord, and will not affect sharding or third party HTTP endpoints that may be in use.
- max_retries
typing.Optional
[int
] Maximum number of times a request will be retried if it fails with a
5xx
status. Defaults to 3 if set toNone
.- proxy_settings
typing.Optional
[hikari.impl.config.ProxySettings
] Custom proxy settings to use with network-layer logic in your application to get through an HTTP-proxy.
- rest_url
typing.Optional
[str
] Defaults to the Discord REST API URL if
None
. Can be overridden if you are attempting to point to an unofficial endpoint, or if you are attempting to mock/stub the Discord API for any reason. Generally you do not want to change this.
Examples
Setting up logging using a dictionary configuration:
import os import hikari # We want to make gateway logs output as DEBUG, and TRACE for all ratelimit content. bot = hikari.GatewayBot( token=os.environ["BOT_TOKEN"], logs={ "version": 1, "incremental": True, "loggers": { "hikari.gateway": {"level": "DEBUG"}, "hikari.ratelimits": {"level": "TRACE_HIKARI"}, }, }, )
- shards: Mapping[int, hikari.api.shard.GatewayShard][source]#
Mapping of shards in this application instance.
Each shard ID is mapped to the corresponding shard instance.
If the application has not started, it is acceptable to assume the result of this call will be an empty mapping.
- dispatch(event)[source]#
Dispatch an event.
- Parameters
- event
hikari.events.base_events.Event
The event to dispatch.
- event
- Returns
asyncio.Future
[typing.Any
]A future that can be optionally awaited. If awaited, the future will complete once all corresponding event listeners have been invoked. If not awaited, this will schedule the dispatch of the events in the background for later.
See also
Listen
Stream
Subscribe
Unsubscribe
Wait_for
Examples
We can dispatch custom events by first defining a class that derives from
hikari.events.base_events.Event
.import attr from hikari.traits import RESTAware from hikari.events.base_events import Event from hikari.users import User from hikari.snowflakes import Snowflake @attr.define() class EveryoneMentionedEvent(Event): app: RESTAware = attr.field() author: User = attr.field() '''The user who mentioned everyone.''' content: str = attr.field() '''The message that was sent.''' message_id: Snowflake = attr.field() '''The message ID.''' channel_id: Snowflake = attr.field() '''The channel ID.'''
We can then dispatch our event as we see fit.
from hikari.events.messages import MessageCreateEvent @bot.listen(MessageCreateEvent) async def on_message(event): if "@everyone" in event.content or "@here" in event.content: event = EveryoneMentionedEvent( author=event.author, content=event.content, message_id=event.id, channel_id=event.channel_id, ) bot.dispatch(event)
This event can be listened to elsewhere by subscribing to it with
hikari.impl.event_manager_base.EventManager.subscribe
.@bot.listen(EveryoneMentionedEvent) async def on_everyone_mentioned(event): print(event.user, "just pinged everyone in", event.channel_id)
- get_listeners(event_type, /, *, polymorphic=True)[source]#
Get the listeners for a given event type, if there are any.
- Parameters
- event_type
typing.Type
[EventT
] The event type to look for.
EventT
must be a subclass ofhikari.events.base_events.Event
.- polymorphicbool
If
True
, this will also return the listeners of the subclasses of the given event type. IfFalse
, then only listeners for this class specifically are returned. The default isTrue
.
- event_type
- Returns
typing.Collection
[typing.Callable
[[EventT
],typing.Coroutine
[typing.Any
,typing.Any
,None
]]]A copy of the collection of listeners for the event. Will return an empty collection if nothing is registered.
- listen(*event_types)[source]#
Generate a decorator to subscribe a callback to an event type.
This is a second-order decorator.
- Parameters
- *event_types
typing.Optional
[typing.Type
[EventT
]] The event types to subscribe to. The implementation may allow this to be undefined. If this is the case, the event type will be inferred instead from the type hints on the function signature.
EventT
must be a subclass ofhikari.events.base_events.Event
.
- *event_types
- Returns
typing.Callable
[[EventT
],EventT
]A decorator for a coroutine function that passes it to
EventManager.subscribe
before returning the function reference.
See also
Dispatch
Stream
Subscribe
Unsubscribe
Wait_for
- static print_banner(banner, allow_color, force_color, extra_args=None)[source]#
Print the banner.
This allows library vendors to override this behaviour, or choose to inject their own “branding” on top of what hikari provides by default.
Normal users should not need to invoke this function, and can simply change the
banner
argument passed to the constructor to manipulate what is displayed.- Parameters
- banner
typing.Optional
[str
] The package to find a
banner.txt
in.- allow_colorbool
A flag that allows advising whether to allow color if supported or not. Can be overridden by setting a
"CLICOLOR"
environment variable to a non-"0"
string.- force_colorbool
A flag that allows forcing color to always be output, even if the terminal device may not support it. Setting the
"CLICOLOR_FORCE"
environment variable to a non-"0"
string will override this.This will take precedence over
allow_color
if both are specified.- extra_args
typing.Optional
[typing.Dict
[str
,str
]] If provided, extra $-substitutions to use when printing the banner. Default substitutions can not be overwritten.
- banner
- Raises
ValueError
If
extra_args
contains a default $-substitution.
- run(*, activity=None, afk=False, asyncio_debug=None, check_for_updates=True, close_passed_executor=False, close_loop=True, coroutine_tracking_depth=None, enable_signal_handlers=None, idle_since=None, ignore_session_start_limit=False, large_threshold=250, propagate_interrupts=False, status=presences.Status.ONLINE, shard_ids=None, shard_count=None)[source]#
Start the bot, wait for all shards to become ready, and then return.
- Other Parameters
- activity
typing.Optional
[hikari.presences.Activity
] The initial activity to display in the bot user presence, or
None
(default) to not show any.- afkbool
The initial AFK state to display in the bot user presence, or
False
(default) to not show any.- asyncio_debugbool
Defaults to
False
. IfTrue
, then debugging is enabled for the asyncio event loop in use.- check_for_updatesbool
Defaults to
True
. IfTrue
, will check for newer versions ofhikari
on PyPI and notify if available.- close_passed_executorbool
Defaults to
False
. IfTrue
, any customconcurrent.futures.Executor
passed to the constructor will be shut down when the application terminates. This does not affect the default executor associated with the event loop, and will not do anything if you do not provide a custom executor to the constructor.- close_loopbool
Defaults to
True
. IfTrue
, then once the bot enters a state where all components have shut down permanently during application shut down, then all asyncgens and background tasks will be destroyed, and the event loop will be shut down.This will wait until all
hikari
-ownedaiohttp
connectors have had time to attempt to shut down correctly (around 250ms), and on Python 3.9 and newer, will also shut down the default event loop executor too.- coroutine_tracking_depth
typing.Optional
[int
] Defaults to
None
. If an integer value and supported by the interpreter, then this many nested coroutine calls will be tracked with their call origin state. This allows you to determine where non-awaited coroutines may originate from, but generally you do not want to leave this enabled for performance reasons.- enable_signal_handlers
typing.Optional
[bool] Defaults to
True
if this is started in the main thread.If on a non-Windows OS with builtin support for kernel-level POSIX signals, then setting this to
True
will allow treating keyboard interrupts and other OS signals to safely shut down the application as calls to shut down the application properly rather than just killing the process in a dirty state immediately. You should leave this enabled unless you plan to implement your own signal handling yourself.- idle_since
typing.Optional
[datetime.datetime
] The
datetime.datetime
the user should be marked as being idle since, orNone
(default) to not show this.- ignore_session_start_limitbool
Defaults to
False
. IfFalse
, then attempting to start more sessions than you are allowed in a 24 hour window will throw aRuntimeError
rather than going ahead and hitting the IDENTIFY limit, which may result in your token being reset. Setting toTrue
disables this behavior.- large_threshold
int
Threshold for members in a guild before it is treated as being “large” and no longer sending member details in the
GUILD CREATE
event. Defaults to250
.- propagate_interruptsbool
Defaults to
False
. If set toTrue
, then any internalhikari.errors.HikariInterrupt
that is raises as a result of catching an OS level signal will result in the exception being rethrown once the application has closed. This can allow you to use hikari signal handlers and still be able to determine what kind of interrupt the application received after it closes. WhenFalse
, nothing is raised and the call will terminate cleanly and silently where possible instead.- shard_ids
typing.Optional
[typing.Sequence
[int
]] The shard IDs to create shards for. If not
None
, then a non-None
shard_count
must ALSO be provided. Defaults toNone
, which means the Discord-recommended count is used for your application instead.Note that the sequence will be de-duplicated.
- shard_count
typing.Optional
[int
] The number of shards to use in the entire distributed application. Defaults to
None
which results in the count being determined dynamically on startup.- status
hikari.presences.Status
The initial status to show for the user presence on startup. Defaults to
hikari.presences.Status.ONLINE
.
- activity
- Raises
hikari.errors.ComponentStateConflictError
If bot is already running.
TypeError
If
shard_ids
is passed withoutshard_count
.
- async start(*, activity=None, afk=False, check_for_updates=True, idle_since=None, ignore_session_start_limit=False, large_threshold=250, shard_ids=None, shard_count=None, status=presences.Status.ONLINE)[source]#
Start the bot, wait for all shards to become ready, and then return.
- Other Parameters
- activity
typing.Optional
[hikari.presences.Activity
] The initial activity to display in the bot user presence, or
None
(default) to not show any.- afkbool
The initial AFK state to display in the bot user presence, or
False
(default) to not show any.- check_for_updatesbool
Defaults to
True
. IfTrue
, will check for newer versions ofhikari
on PyPI and notify if available.- idle_since
typing.Optional
[datetime.datetime
] The
datetime.datetime
the user should be marked as being idle since, orNone
(default) to not show this.- ignore_session_start_limitbool
Defaults to
False
. IfFalse
, then attempting to start more sessions than you are allowed in a 24 hour window will throw aRuntimeError
rather than going ahead and hitting the IDENTIFY limit, which may result in your token being reset. Setting toTrue
disables this behavior.- large_threshold
int
Threshold for members in a guild before it is treated as being “large” and no longer sending member details in the
GUILD CREATE
event. Defaults to250
.- shard_ids
typing.Optional
[typing.Sequence
[int
]] The shard IDs to create shards for. If not
None
, then a non-None
shard_count
must ALSO be provided. Defaults toNone
, which means the Discord-recommended count is used for your application instead.Note that the sequence will be de-duplicated.
- shard_count
typing.Optional
[int
] The number of shards to use in the entire distributed application. Defaults to
None
which results in the count being determined dynamically on startup.- status
hikari.presences.Status
The initial status to show for the user presence on startup. Defaults to
hikari.presences.Status.ONLINE
.
- activity
- Raises
TypeError
If
shard_ids
is passed withoutshard_count
.hikari.errors.ComponentStateConflictError
If bot is already running.
- stream(event_type, /, timeout, limit=None)[source]#
Return a stream iterator for the given event and sub-events.
Warning
If you use
stream.open()
to start the stream then you must also close it withstream.close()
otherwise it may queue events in memory indefinitely.- Parameters
- event_type
typing.Type
[hikari.events.base_events.Event
] The event type to listen for. This will listen for subclasses of this type additionally.
- timeout
typing.Optional
[int
,float
] How long this streamer should wait for the next event before ending the iteration. If
None
then this will continue until explicitly broken from.- limit
typing.Optional
[int
] The limit for how many events this should queue at one time before dropping extra incoming events, leave this as
None
for the cache size to be unlimited.
- event_type
- Returns
EventStream
[hikari.events.base_events.Event
]The async iterator to handle streamed events. This must be started with
with stream:
orstream.open()
before asynchronously iterating over it.
See also
Dispatch
Listen
Subscribe
Unsubscribe
Wait_for
Examples
with bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) as stream: async for user_id in stream.map("user_id").limit(50): ...
or using
open()
andclose()
stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) stream.open() async for user_id in stream.map("user_id").limit(50) ... stream.close()
- subscribe(event_type, callback)[source]#
Subscribe a given callback to a given event type.
- Parameters
- event_type
typing.Type
[T
] The event type to listen for. This will also listen for any subclasses of the given type.
T
must be a subclass ofhikari.events.base_events.Event
.- callback
Must be a coroutine function to invoke. This should consume an instance of the given event, or an instance of a valid subclass if one exists. Any result is discarded.
- event_type
See also
Dispatch
Listen
Stream
Unsubscribe
Wait_for
Examples
The following demonstrates subscribing a callback to message creation events.
from hikari.events.messages import MessageCreateEvent async def on_message(event): ... bot.subscribe(MessageCreateEvent, on_message)
- unsubscribe(event_type, callback)[source]#
Unsubscribe a given callback from a given event type, if present.
- Parameters
- event_type
typing.Type
[T
] The event type to unsubscribe from. This must be the same exact type as was originally subscribed with to be removed correctly.
T
must derive fromhikari.events.base_events.Event
.- callback
The callback to unsubscribe.
- event_type
See also
Dispatch
Listen
Stream
Subscribe
Wait_for
Examples
The following demonstrates unsubscribing a callback from a message creation event.
from hikari.events.messages import MessageCreateEvent async def on_message(event): ... bot.unsubscribe(MessageCreateEvent, on_message)
- async wait_for(event_type, /, timeout, predicate=None)[source]#
Wait for a given event to occur once, then return the event.
Warning
Async predicates are not supported.
- Parameters
- event_type
typing.Type
[hikari.events.base_events.Event
] The event type to listen for. This will listen for subclasses of this type additionally.
- predicate
A function taking the event as the single parameter. This should return
True
if the event is one you want to return, orFalse
if the event should not be returned. If left asNone
(the default), then the first matching event type that the bot receives (or any subtype) will be the one returned.- timeout
typing.Union
[float
,int
,None
] The amount of time to wait before raising an
asyncio.TimeoutError
and giving up instead. This is measured in seconds. IfNone
, then no timeout will be waited for (no timeout can result in “leaking” of coroutines that never complete if called in an uncontrolled way, so is not recommended).
- event_type
- Returns
hikari.events.base_events.Event
The event that was provided.
- Raises
asyncio.TimeoutError
If the timeout is not
None
and is reached before an event is received that the predicate returnsTrue
for.
See also
Dispatch
Listen
Stream
Subscribe
Unsubscribe