Milestone 5: deliver embedded RDP sessions and lifecycle hardening

This commit is contained in:
Keith Smith
2026-03-03 18:59:26 -07:00
parent 230a401386
commit 36006bd4aa
2941 changed files with 724359 additions and 77 deletions

View File

@@ -0,0 +1,531 @@
/**
* WinPR: Windows Portable Runtime
* Object Pool
*
* Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <winpr/config.h>
#include <winpr/crt.h>
#include <winpr/wlog.h>
#include <winpr/collections.h>
#include "../stream.h"
#include "../log.h"
#define TAG WINPR_TAG("utils.streampool")
struct s_StreamPoolEntry
{
#if defined(WITH_STREAMPOOL_DEBUG)
char** msg;
size_t lines;
#endif
wStream* s;
};
struct s_wStreamPool
{
size_t aSize;
size_t aCapacity;
struct s_StreamPoolEntry* aArray;
size_t uSize;
size_t uCapacity;
struct s_StreamPoolEntry* uArray;
CRITICAL_SECTION lock;
BOOL synchronized;
size_t defaultSize;
};
static void discard_entry(struct s_StreamPoolEntry* entry, BOOL discardStream)
{
if (!entry)
return;
#if defined(WITH_STREAMPOOL_DEBUG)
free((void*)entry->msg);
#endif
if (discardStream && entry->s)
Stream_Free(entry->s, entry->s->isAllocatedStream);
const struct s_StreamPoolEntry empty = WINPR_C_ARRAY_INIT;
*entry = empty;
}
static struct s_StreamPoolEntry add_entry(wStream* s)
{
struct s_StreamPoolEntry entry = WINPR_C_ARRAY_INIT;
#if defined(WITH_STREAMPOOL_DEBUG)
void* stack = winpr_backtrace(20);
if (stack)
entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
winpr_backtrace_free(stack);
#endif
entry.s = s;
return entry;
}
/**
* Lock the stream pool
*/
static inline void StreamPool_Lock(wStreamPool* pool)
{
WINPR_ASSERT(pool);
if (pool->synchronized)
EnterCriticalSection(&pool->lock);
}
/**
* Unlock the stream pool
*/
static inline void StreamPool_Unlock(wStreamPool* pool)
{
WINPR_ASSERT(pool);
if (pool->synchronized)
LeaveCriticalSection(&pool->lock);
}
static BOOL StreamPool_EnsureCapacity(wStreamPool* pool, size_t count, BOOL usedOrAvailable)
{
WINPR_ASSERT(pool);
size_t* cap = (usedOrAvailable) ? &pool->uCapacity : &pool->aCapacity;
size_t* size = (usedOrAvailable) ? &pool->uSize : &pool->aSize;
struct s_StreamPoolEntry** array = (usedOrAvailable) ? &pool->uArray : &pool->aArray;
size_t new_cap = 0;
if (*cap == 0)
new_cap = *size + count;
else if (*size + count > *cap)
new_cap = (*size + count + 2) / 2 * 3;
else if ((*size + count) < *cap / 3)
new_cap = *cap / 2;
if (new_cap > 0)
{
struct s_StreamPoolEntry* new_arr = nullptr;
if (*cap < *size + count)
*cap += count;
new_arr =
(struct s_StreamPoolEntry*)realloc(*array, sizeof(struct s_StreamPoolEntry) * new_cap);
if (!new_arr)
return FALSE;
*cap = new_cap;
*array = new_arr;
}
return TRUE;
}
/**
* Methods
*/
static void StreamPool_ShiftUsed(wStreamPool* pool, size_t index)
{
WINPR_ASSERT(pool);
const size_t pcount = 1;
const size_t off = index + pcount;
if (pool->uSize >= off)
{
for (size_t x = 0; x < pcount; x++)
{
struct s_StreamPoolEntry* cur = &pool->uArray[index + x];
discard_entry(cur, FALSE);
}
MoveMemory(&pool->uArray[index], &pool->uArray[index + pcount],
(pool->uSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
pool->uSize -= pcount;
}
}
/**
* Adds a used stream to the pool.
*/
static void StreamPool_AddUsed(wStreamPool* pool, wStream* s)
{
StreamPool_EnsureCapacity(pool, 1, TRUE);
pool->uArray[pool->uSize] = add_entry(s);
pool->uSize++;
}
/**
* Removes a used stream from the pool.
*/
static void StreamPool_RemoveUsed(wStreamPool* pool, wStream* s)
{
WINPR_ASSERT(pool);
for (size_t index = 0; index < pool->uSize; index++)
{
struct s_StreamPoolEntry* cur = &pool->uArray[index];
if (cur->s == s)
{
StreamPool_ShiftUsed(pool, index);
break;
}
}
}
static void StreamPool_ShiftAvailable(wStreamPool* pool, size_t index)
{
WINPR_ASSERT(pool);
const size_t pcount = 1;
const size_t off = index + pcount;
if (pool->aSize >= off)
{
for (size_t x = 0; x < pcount; x++)
{
struct s_StreamPoolEntry* cur = &pool->aArray[index + x];
discard_entry(cur, FALSE);
}
MoveMemory(&pool->aArray[index], &pool->aArray[index + pcount],
(pool->aSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
pool->aSize -= pcount;
}
}
/**
* Gets a stream from the pool.
*/
wStream* StreamPool_Take(wStreamPool* pool, size_t size)
{
BOOL found = FALSE;
size_t foundIndex = 0;
wStream* s = nullptr;
StreamPool_Lock(pool);
if (size == 0)
size = pool->defaultSize;
for (size_t index = 0; index < pool->aSize; index++)
{
struct s_StreamPoolEntry* cur = &pool->aArray[index];
s = cur->s;
if (Stream_Capacity(s) >= size)
{
found = TRUE;
foundIndex = index;
break;
}
}
if (!found)
{
s = Stream_New(nullptr, size);
if (!s)
goto out_fail;
}
else if (s)
{
Stream_ResetPosition(s);
Stream_SetLength(s, Stream_Capacity(s));
StreamPool_ShiftAvailable(pool, foundIndex);
}
if (s)
{
s->pool = pool;
s->count = 1;
StreamPool_AddUsed(pool, s);
}
out_fail:
StreamPool_Unlock(pool);
return s;
}
/**
* Returns an object to the pool.
*/
static void StreamPool_Remove(wStreamPool* pool, wStream* s)
{
StreamPool_EnsureCapacity(pool, 1, FALSE);
Stream_EnsureValidity(s);
for (size_t x = 0; x < pool->aSize; x++)
{
wStream* cs = pool->aArray[x].s;
if (cs == s)
return;
}
pool->aArray[(pool->aSize)++] = add_entry(s);
StreamPool_RemoveUsed(pool, s);
}
static void StreamPool_ReleaseOrReturn(wStreamPool* pool, wStream* s)
{
StreamPool_Lock(pool);
StreamPool_Remove(pool, s);
StreamPool_Unlock(pool);
}
void StreamPool_Return(wStreamPool* pool, wStream* s)
{
WINPR_ASSERT(pool);
if (!s)
return;
StreamPool_Lock(pool);
StreamPool_Remove(pool, s);
StreamPool_Unlock(pool);
}
/**
* Increment stream reference count
*/
void Stream_AddRef(wStream* s)
{
WINPR_ASSERT(s);
s->count++;
}
/**
* Decrement stream reference count
*/
void Stream_Release(wStream* s)
{
WINPR_ASSERT(s);
if (s->count > 0)
s->count--;
if (s->count == 0)
{
if (s->pool)
StreamPool_ReleaseOrReturn(s->pool, s);
else
Stream_Free(s, TRUE);
}
}
/**
* Find stream in pool using pointer inside buffer
*/
wStream* StreamPool_Find(wStreamPool* pool, const BYTE* ptr)
{
wStream* s = nullptr;
StreamPool_Lock(pool);
for (size_t index = 0; index < pool->uSize; index++)
{
struct s_StreamPoolEntry* cur = &pool->uArray[index];
if ((ptr >= Stream_Buffer(cur->s)) &&
(ptr < (Stream_Buffer(cur->s) + Stream_Capacity(cur->s))))
{
s = cur->s;
break;
}
}
StreamPool_Unlock(pool);
return s;
}
/**
* Releases the streams currently cached in the pool.
*/
void StreamPool_Clear(wStreamPool* pool)
{
StreamPool_Lock(pool);
for (size_t x = 0; x < pool->aSize; x++)
{
struct s_StreamPoolEntry* cur = &pool->aArray[x];
discard_entry(cur, TRUE);
}
pool->aSize = 0;
if (pool->uSize > 0)
{
WLog_WARN(TAG, "Clearing StreamPool, but there are %" PRIuz " streams currently in use",
pool->uSize);
for (size_t x = 0; x < pool->uSize; x++)
{
struct s_StreamPoolEntry* cur = &pool->uArray[x];
discard_entry(cur, TRUE);
}
pool->uSize = 0;
}
StreamPool_Unlock(pool);
}
size_t StreamPool_UsedCount(wStreamPool* pool)
{
StreamPool_Lock(pool);
size_t usize = pool->uSize;
StreamPool_Unlock(pool);
return usize;
}
/**
* Construction, Destruction
*/
wStreamPool* StreamPool_New(BOOL synchronized, size_t defaultSize)
{
wStreamPool* pool = nullptr;
pool = (wStreamPool*)calloc(1, sizeof(wStreamPool));
if (pool)
{
pool->synchronized = synchronized;
pool->defaultSize = defaultSize;
if (!StreamPool_EnsureCapacity(pool, 32, FALSE))
goto fail;
if (!StreamPool_EnsureCapacity(pool, 32, TRUE))
goto fail;
if (!InitializeCriticalSectionAndSpinCount(&pool->lock, 4000))
goto fail;
}
return pool;
fail:
WINPR_PRAGMA_DIAG_PUSH
WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC
StreamPool_Free(pool);
WINPR_PRAGMA_DIAG_POP
return nullptr;
}
void StreamPool_Free(wStreamPool* pool)
{
if (pool)
{
StreamPool_Clear(pool);
DeleteCriticalSection(&pool->lock);
free(pool->aArray);
free(pool->uArray);
free(pool);
}
}
char* StreamPool_GetStatistics(wStreamPool* pool, char* buffer, size_t size)
{
WINPR_ASSERT(pool);
if (!buffer || (size < 1))
return nullptr;
size_t used = 0;
int offset = _snprintf(buffer, size - 1,
"aSize =%" PRIuz ", uSize =%" PRIuz ", aCapacity=%" PRIuz
", uCapacity=%" PRIuz,
pool->aSize, pool->uSize, pool->aCapacity, pool->uCapacity);
if ((offset > 0) && ((size_t)offset < size))
used += (size_t)offset;
#if defined(WITH_STREAMPOOL_DEBUG)
StreamPool_Lock(pool);
offset = _snprintf(&buffer[used], size - 1 - used, "\n-- dump used array take locations --\n");
if ((offset > 0) && ((size_t)offset < size - used))
used += (size_t)offset;
for (size_t x = 0; x < pool->uSize; x++)
{
const struct s_StreamPoolEntry* cur = &pool->uArray[x];
WINPR_ASSERT(cur->msg || (cur->lines == 0));
for (size_t y = 0; y < cur->lines; y++)
{
offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz " | %" PRIuz "]: %s\n", x,
y, cur->msg[y]);
if ((offset > 0) && ((size_t)offset < size - used))
used += (size_t)offset;
}
}
offset = _snprintf(&buffer[used], size - 1 - used, "\n-- statistics called from --\n");
if ((offset > 0) && ((size_t)offset < size - used))
used += (size_t)offset;
struct s_StreamPoolEntry entry = WINPR_C_ARRAY_INIT;
void* stack = winpr_backtrace(20);
if (stack)
entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
winpr_backtrace_free(stack);
for (size_t x = 0; x < entry.lines; x++)
{
const char* msg = entry.msg[x];
offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz "]: %s\n", x, msg);
if ((offset > 0) && ((size_t)offset < size - used))
used += (size_t)offset;
}
free((void*)entry.msg);
StreamPool_Unlock(pool);
#endif
buffer[used] = '\0';
return buffer;
}
BOOL StreamPool_WaitForReturn(wStreamPool* pool, UINT32 timeoutMS)
{
wLog* log = WLog_Get(TAG);
/* HACK: We disconnected the transport above, now wait without a read or write lock until all
* streams in use have been returned to the pool. */
while (timeoutMS > 0)
{
const size_t used = StreamPool_UsedCount(pool);
if (used == 0)
return TRUE;
WLog_Print(log, WLOG_DEBUG, "%" PRIuz " streams still in use, sleeping...", used);
char buffer[4096] = WINPR_C_ARRAY_INIT;
StreamPool_GetStatistics(pool, buffer, sizeof(buffer));
WLog_Print(log, WLOG_TRACE, "Pool statistics: %s", buffer);
UINT32 diff = 10;
if (timeoutMS != INFINITE)
{
diff = timeoutMS > 10 ? 10 : timeoutMS;
timeoutMS -= diff;
}
Sleep(diff);
}
return FALSE;
}