1#![warn(
31 rust_2018_idioms,
32 deprecated_in_future,
33 macro_use_extern_crate,
34 missing_debug_implementations,
35 unused_qualifications
36)]
37
38use bytemuck::NoUninit;
39pub use steam::*;
40
41use crate::callbacks::CallbackDispatchers;
42use atomic::Atomic;
43use az::WrappingCast;
44use derive_more::Deref;
45use fnv::FnvHashMap;
46use futures::future::BoxFuture;
47use futures::{FutureExt, Stream};
48use parking_lot::Mutex;
49use snafu::ensure;
50use static_assertions::assert_impl_all;
51use std::convert::TryInto;
52use std::ffi::{CStr, c_void};
53use std::mem::MaybeUninit;
54use std::os::raw::c_char;
55use std::sync::Arc;
56use std::time::Duration;
57use std::{ptr, thread};
58use steamworks_sys as sys;
59use tracing::{Level, event};
60
61pub mod callbacks;
62
63mod steam;
64mod string_ext;
65
66#[derive(Debug, Copy, Clone, Eq, PartialEq, NoUninit)]
67#[repr(u8)]
68enum SteamApiState {
69 Stopped,
70 Running,
71 ShutdownStage1,
72 ShutdownStage2,
73}
74
75static STEAM_API_STATE: Atomic<SteamApiState> = Atomic::new(SteamApiState::Stopped);
76
77#[derive(Debug, Clone)]
81pub struct Client(Arc<ClientInner>);
82
83assert_impl_all!(Client: Send, Sync);
84
85#[derive(Debug)]
86struct ClientInner {
87 callback_dispatchers: CallbackDispatchers,
88 call_result_handles:
89 Mutex<FnvHashMap<sys::SteamAPICall_t, futures::channel::oneshot::Sender<Vec<u8>>>>,
90 friends: SteamworksInterface<sys::ISteamFriends>,
91 remote_storage: SteamworksInterface<sys::ISteamRemoteStorage>,
92 ugc: SteamworksInterface<sys::ISteamUGC>,
93 user: SteamworksInterface<sys::ISteamUser>,
94 user_stats: SteamworksInterface<sys::ISteamUserStats>,
95 utils: SteamworksInterface<sys::ISteamUtils>,
96}
97
98#[derive(Debug, Copy, Clone, Deref)]
99struct SteamworksInterface<T>(*mut T);
100
101unsafe impl<T> Send for SteamworksInterface<T> {}
102unsafe impl<T> Sync for SteamworksInterface<T> {}
103
104impl Client {
105 pub fn init() -> Result<Self, InitError> {
113 ensure!(
114 STEAM_API_STATE
115 .compare_exchange(
116 SteamApiState::Stopped,
117 SteamApiState::Running,
118 atomic::Ordering::AcqRel,
119 atomic::Ordering::Acquire
120 )
121 .is_ok(),
122 AlreadyInitializedSnafu
123 );
124
125 let success = unsafe { sys::SteamAPI_Init() };
126 if !success {
127 STEAM_API_STATE.store(SteamApiState::Stopped, atomic::Ordering::Release);
128 return OtherSnafu.fail();
129 }
130
131 unsafe {
132 sys::SteamAPI_ManualDispatch_Init();
133 }
134
135 let utils = unsafe { SteamworksInterface(sys::SteamAPI_SteamUtils_v010()) };
136 unsafe {
137 sys::SteamAPI_ISteamUtils_SetWarningMessageHook(*utils, Some(warning_message_hook));
138 }
139
140 let client = unsafe {
141 Client(Arc::new(ClientInner {
142 callback_dispatchers: CallbackDispatchers::new(),
143 call_result_handles: Mutex::new(FnvHashMap::default()),
144 friends: SteamworksInterface(sys::SteamAPI_SteamFriends_v017()),
145 remote_storage: SteamworksInterface(sys::SteamAPI_SteamRemoteStorage_v014()),
146 ugc: SteamworksInterface(sys::SteamAPI_SteamUGC_v014()),
147 user: SteamworksInterface(sys::SteamAPI_SteamUser_v021()),
148 user_stats: SteamworksInterface(sys::SteamAPI_SteamUserStats_v012()),
149 utils,
150 }))
151 };
152
153 start_worker_thread(client.clone());
154 event!(Level::DEBUG, "Steamworks API initialized");
155
156 Ok(client)
157 }
158
159 pub fn find_leaderboard(
164 &self,
165 leaderboard_name: impl Into<Vec<u8>>,
166 ) -> BoxFuture<'_, Result<user_stats::LeaderboardHandle, user_stats::FindLeaderboardError>>
167 {
168 user_stats::find_leaderboard(self, leaderboard_name.into()).boxed()
169 }
170
171 pub fn query_all_ugc(&self, matching_ugc_type: ugc::MatchingUgcType) -> ugc::QueryAllUgc {
174 ugc::QueryAllUgc::new(self.clone(), matching_ugc_type)
175 }
176
177 pub fn app_id(&self) -> AppId {
179 unsafe { sys::SteamAPI_ISteamUtils_GetAppID(*self.0.utils).into() }
180 }
181
182 pub fn steam_id(&self) -> SteamId {
184 let id = unsafe { sys::SteamAPI_ISteamUser_GetSteamID(*self.0.user) };
185
186 id.into()
187 }
188
189 pub fn on_persona_state_changed(
191 &self,
192 ) -> impl Stream<Item = callbacks::PersonaStateChange> + Send + use<'_> {
193 callbacks::register_to_receive_callback(&self.0.callback_dispatchers.persona_state_change)
194 }
195
196 pub fn on_steam_shutdown(&self) -> impl Stream<Item = ()> + Send + use<'_> {
198 callbacks::register_to_receive_callback(&self.0.callback_dispatchers.steam_shutdown)
199 }
200
201 async unsafe fn register_for_call_result<CallResult: Copy>(
202 &self,
203 handle: sys::SteamAPICall_t,
204 ) -> CallResult {
205 let (tx, rx) = futures::channel::oneshot::channel::<Vec<u8>>();
206 self.0.call_result_handles.lock().insert(handle, tx);
207 rx.map(|result| {
208 let bytes = result.unwrap();
209
210 assert_eq!(bytes.len(), size_of::<CallResult>());
211 unsafe { ptr::read_unaligned(bytes.as_ptr() as *const CallResult) }
212 })
213 .await
214 }
215}
216
217impl Drop for ClientInner {
218 fn drop(&mut self) {
219 STEAM_API_STATE.store(SteamApiState::ShutdownStage1, atomic::Ordering::Release);
220 event!(
221 Level::DEBUG,
222 "Preparing to shutdown Steam API; waiting for worker thread to exit"
223 );
224 loop {
225 thread::sleep(Duration::from_millis(1));
226
227 if STEAM_API_STATE.load(atomic::Ordering::Acquire) == SteamApiState::ShutdownStage2 {
228 break;
229 }
230 }
231
232 event!(Level::DEBUG, "Shutting down Steam API");
233 unsafe {
234 sys::SteamAPI_Shutdown();
235 }
236
237 event!(Level::DEBUG, "Finished shutting down Steam API");
238 STEAM_API_STATE.store(SteamApiState::Stopped, atomic::Ordering::Release);
239 }
240}
241
242unsafe extern "C" fn warning_message_hook(severity: i32, debug_text: *const c_char) {
243 let debug_text = unsafe { CStr::from_ptr(debug_text) };
244 if severity == 1 {
245 event!(Level::WARN, ?debug_text, "Steam API warning");
246 } else {
247 event!(Level::INFO, ?debug_text, "Steam API message");
248 }
249}
250
251fn start_worker_thread(client: Client) {
252 thread::Builder::new().name("Steam API Worker".into()).spawn(move || {
253 unsafe {
254 let steam_pipe = sys::SteamAPI_GetHSteamPipe();
255 loop {
256 sys::SteamAPI_ManualDispatch_RunFrame(steam_pipe);
257 let mut callback_msg: MaybeUninit<sys::CallbackMsg_t> = MaybeUninit::uninit();
258 while sys::SteamAPI_ManualDispatch_GetNextCallback(
259 steam_pipe,
260 callback_msg.as_mut_ptr(),
261 ) {
262 let callback = callback_msg.assume_init();
263
264 if callback.m_iCallback
266 == sys::SteamAPICallCompleted_t_k_iCallback.wrapping_cast()
267 {
268 assert!(!callback.m_pubParam.is_null());
271 assert_eq!(
272 callback
273 .m_pubParam
274 .align_offset(align_of::<sys::SteamAPICallCompleted_t>()),
275 0
276 );
277 let call_completed =
278 &mut *(callback.m_pubParam as *mut sys::SteamAPICallCompleted_t);
279
280 let mut call_result_buf =
281 vec![0_u8; call_completed.m_cubParam.try_into().unwrap()];
282 let mut failed = true;
283 if sys::SteamAPI_ManualDispatch_GetAPICallResult(
284 steam_pipe,
285 call_completed.m_hAsyncCall,
286 call_result_buf.as_mut_ptr() as *mut c_void,
287 call_result_buf.len().try_into().unwrap(),
288 call_completed.m_iCallback,
289 &mut failed,
290 ) {
291 assert!(!failed, "'SteamAPI_ManualDispatch_GetAPICallResult' indicated failure by returning a value of 'true' for its 'pbFailed' parameter");
292
293 let call_id = call_completed.m_hAsyncCall;
294 match client.0.call_result_handles.lock().remove(&call_id) {
295 Some(tx) => {
296 tx.send(call_result_buf).ok();
297 }
298 None => {
299 event!(
300 Level::WARN,
301 SteamAPICallCompleted_t = ?call_completed,
302 "a CallResult became available, but its recipient was not found"
303 );
304 }
305 }
306 } else {
307 panic!("'SteamAPI_ManualDispatch_GetAPICallResult' returned false");
308 }
309 } else {
310 callbacks::dispatch_callbacks(&client.0.callback_dispatchers, callback);
313 }
314
315 sys::SteamAPI_ManualDispatch_FreeLastCallback(steam_pipe);
316 }
317
318 if STEAM_API_STATE
319 .compare_exchange_weak(
320 SteamApiState::ShutdownStage1,
321 SteamApiState::ShutdownStage2,
322 atomic::Ordering::AcqRel,
323 atomic::Ordering::Acquire,
324 )
325 .is_ok()
326 {
327 event!(
328 Level::DEBUG,
329 "worker thread shutting down due to receiving shutdown signal"
330 );
331
332 break;
333 }
334
335 thread::sleep(Duration::from_millis(1));
336 }
337 }
338 }).unwrap();
339}
340
341#[derive(Debug, snafu::Snafu)]
342pub enum InitError {
343 #[snafu(display("Tried to initialize Steam API when it was already initialized"))]
345 AlreadyInitialized,
346
347 #[snafu(display("The Steamworks API failed to initialize (SteamAPI_Init() returned false)"))]
349 Other,
350}