Skip to content

Instantly share code, notes, and snippets.

@ThunderEX
Created April 12, 2020 03:55
Show Gist options
  • Select an option

  • Save ThunderEX/67c6f6a939da28c0316a699449190f15 to your computer and use it in GitHub Desktop.

Select an option

Save ThunderEX/67c6f6a939da28c0316a699449190f15 to your computer and use it in GitHub Desktop.
psutil.cpu_count(logical=False) in ctypes
from ctypes import LibraryLoader, WinDLL, WinError, GetLastError
from ctypes import Structure, Union, POINTER, create_string_buffer, cast, byref
from ctypes import c_int, c_uint, c_long, c_ulong, c_int64, c_uint64
from ctypes.wintypes import BOOL, BYTE, WORD, DWORD, WCHAR, HANDLE
from platform import machine
windll = LibraryLoader(WinDLL)
# Windows API from https://github.com/Dobatymo/ctypes-windows-sdk/blob/6f984f4753fcd83cd57f045b204d0be0d702d41c/cwinsdk/um/sysinfoapi.py
X64 = machine().endswith('64')
if X64:
INT_PTR = c_int64
UINT_PTR = c_uint64
LONG_PTR = c_int64
ULONG_PTR = c_uint64
PULONG_PTR = POINTER(c_uint64)
else:
INT_PTR = c_int
UINT_PTR = c_uint
LONG_PTR = c_long
ULONG_PTR = c_ulong
PULONG_PTR = POINTER(c_ulong)
KAFFINITY = ULONG_PTR
PDWORD = POINTER(DWORD)
ANYSIZE_ARRAY = 1
ERROR_INSUFFICIENT_BUFFER = 122
class CEnum(c_int):
pass
class GROUP_AFFINITY(Structure):
_fields_ = [
("Mask", KAFFINITY),
("Group", WORD),
("Reserved", WORD*3),
]
class LOGICAL_PROCESSOR_RELATIONSHIP(CEnum):
RelationProcessorCore = 0
RelationNumaNode = 1
RelationCache = 2
RelationProcessorPackage = 3
RelationGroup = 4
RelationAll = 0xffff
class PROCESSOR_CACHE_TYPE(CEnum):
CacheUnified = 0
CacheInstruction = 1
CacheData = 2
CacheTrace = 3
class PROCESSOR_RELATIONSHIP(Structure):
_fields_ = [
("EfficiencyClass", BYTE),
("Reserved", BYTE*20),
("GroupCount", WORD),
("GroupMask", GROUP_AFFINITY*ANYSIZE_ARRAY),
]
PPROCESSOR_RELATIONSHIP = POINTER(PROCESSOR_RELATIONSHIP)
class NUMA_NODE_RELATIONSHIP(Structure):
_fields_ = [
("NodeNumber", DWORD),
("Reserved", BYTE*20),
("GroupMask", GROUP_AFFINITY),
]
PNUMA_NODE_RELATIONSHIP = POINTER(NUMA_NODE_RELATIONSHIP)
class CACHE_RELATIONSHIP(Structure):
_fields_ = [
("Level", BYTE),
("Associativity", BYTE),
("LineSize", WORD),
("CacheSize", DWORD),
("Type", PROCESSOR_CACHE_TYPE),
("Reserved", BYTE*20),
("GroupMask", GROUP_AFFINITY),
]
PCACHE_RELATIONSHIP = POINTER(CACHE_RELATIONSHIP)
class PROCESSOR_GROUP_INFO(Structure):
_fields_ = [
("MaximumProcessorCount", BYTE),
("ActiveProcessorCount", BYTE*38),
("ActiveProcessorMask", KAFFINITY),
]
PPROCESSOR_GROUP_INFO = POINTER(PROCESSOR_GROUP_INFO)
class GROUP_RELATIONSHIP(Structure):
_fields_ = [
("MaximumGroupCount", WORD),
("ActiveGroupCount", WORD),
("Reserved", BYTE*20),
("GroupInfo", PROCESSOR_GROUP_INFO*ANYSIZE_ARRAY),
]
PGROUP_RELATIONSHIP = POINTER(GROUP_RELATIONSHIP)
class SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_DUMMYUNIONNAME(Union):
_fields_ = [
("Processor", PROCESSOR_RELATIONSHIP),
("NumaNode", NUMA_NODE_RELATIONSHIP),
("Cache", CACHE_RELATIONSHIP),
("Group", GROUP_RELATIONSHIP),
]
class SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX(Structure):
_anonymous_ = ["Dummy"]
_fields_ = [
("Relationship", LOGICAL_PROCESSOR_RELATIONSHIP),
("Size", DWORD),
("Dummy", SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_DUMMYUNIONNAME),
]
PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX = POINTER(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX)
GetLogicalProcessorInformationEx = windll.kernel32.GetLogicalProcessorInformationEx
GetLogicalProcessorInformationEx.argtypes = [LOGICAL_PROCESSOR_RELATIONSHIP, PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX, PDWORD]
GetLogicalProcessorInformationEx.restype = BOOL
def cpu_count_phys():
'''https://github.com/microsoft/verona/blob/7fc162d48ddfa4caa702697e87fe7cfeaa1bb02c/src/rt/sched/cpu.h#L234'''
relation = LOGICAL_PROCESSOR_RELATIONSHIP.RelationProcessorCore
ptr = PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX()
length = DWORD(0)
if GetLogicalProcessorInformationEx(relation, ptr, byref(length)):
return None
if GetLastError() != ERROR_INSUFFICIENT_BUFFER:
return None
buffer = create_string_buffer(length.value)
ptr = cast(buffer, PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX)
if not GetLogicalProcessorInformationEx(relation, ptr, byref(length)):
return None
count = 0
remain = length.value
while remain > 0:
remain -= ptr.contents.Size
ptr = cast(byref(ptr.contents, ptr.contents.Size), PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX)
count += 1
return count
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment