Created
April 12, 2020 03:55
-
-
Save ThunderEX/67c6f6a939da28c0316a699449190f15 to your computer and use it in GitHub Desktop.
psutil.cpu_count(logical=False) in ctypes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from ctypes import LibraryLoader, WinDLL, WinError, GetLastError | |
| from ctypes import Structure, Union, POINTER, create_string_buffer, cast, byref | |
| from ctypes import c_int, c_uint, c_long, c_ulong, c_int64, c_uint64 | |
| from ctypes.wintypes import BOOL, BYTE, WORD, DWORD, WCHAR, HANDLE | |
| from platform import machine | |
| windll = LibraryLoader(WinDLL) | |
| # Windows API from https://github.com/Dobatymo/ctypes-windows-sdk/blob/6f984f4753fcd83cd57f045b204d0be0d702d41c/cwinsdk/um/sysinfoapi.py | |
| X64 = machine().endswith('64') | |
| if X64: | |
| INT_PTR = c_int64 | |
| UINT_PTR = c_uint64 | |
| LONG_PTR = c_int64 | |
| ULONG_PTR = c_uint64 | |
| PULONG_PTR = POINTER(c_uint64) | |
| else: | |
| INT_PTR = c_int | |
| UINT_PTR = c_uint | |
| LONG_PTR = c_long | |
| ULONG_PTR = c_ulong | |
| PULONG_PTR = POINTER(c_ulong) | |
| KAFFINITY = ULONG_PTR | |
| PDWORD = POINTER(DWORD) | |
| ANYSIZE_ARRAY = 1 | |
| ERROR_INSUFFICIENT_BUFFER = 122 | |
| class CEnum(c_int): | |
| pass | |
| class GROUP_AFFINITY(Structure): | |
| _fields_ = [ | |
| ("Mask", KAFFINITY), | |
| ("Group", WORD), | |
| ("Reserved", WORD*3), | |
| ] | |
| class LOGICAL_PROCESSOR_RELATIONSHIP(CEnum): | |
| RelationProcessorCore = 0 | |
| RelationNumaNode = 1 | |
| RelationCache = 2 | |
| RelationProcessorPackage = 3 | |
| RelationGroup = 4 | |
| RelationAll = 0xffff | |
| class PROCESSOR_CACHE_TYPE(CEnum): | |
| CacheUnified = 0 | |
| CacheInstruction = 1 | |
| CacheData = 2 | |
| CacheTrace = 3 | |
| class PROCESSOR_RELATIONSHIP(Structure): | |
| _fields_ = [ | |
| ("EfficiencyClass", BYTE), | |
| ("Reserved", BYTE*20), | |
| ("GroupCount", WORD), | |
| ("GroupMask", GROUP_AFFINITY*ANYSIZE_ARRAY), | |
| ] | |
| PPROCESSOR_RELATIONSHIP = POINTER(PROCESSOR_RELATIONSHIP) | |
| class NUMA_NODE_RELATIONSHIP(Structure): | |
| _fields_ = [ | |
| ("NodeNumber", DWORD), | |
| ("Reserved", BYTE*20), | |
| ("GroupMask", GROUP_AFFINITY), | |
| ] | |
| PNUMA_NODE_RELATIONSHIP = POINTER(NUMA_NODE_RELATIONSHIP) | |
| class CACHE_RELATIONSHIP(Structure): | |
| _fields_ = [ | |
| ("Level", BYTE), | |
| ("Associativity", BYTE), | |
| ("LineSize", WORD), | |
| ("CacheSize", DWORD), | |
| ("Type", PROCESSOR_CACHE_TYPE), | |
| ("Reserved", BYTE*20), | |
| ("GroupMask", GROUP_AFFINITY), | |
| ] | |
| PCACHE_RELATIONSHIP = POINTER(CACHE_RELATIONSHIP) | |
| class PROCESSOR_GROUP_INFO(Structure): | |
| _fields_ = [ | |
| ("MaximumProcessorCount", BYTE), | |
| ("ActiveProcessorCount", BYTE*38), | |
| ("ActiveProcessorMask", KAFFINITY), | |
| ] | |
| PPROCESSOR_GROUP_INFO = POINTER(PROCESSOR_GROUP_INFO) | |
| class GROUP_RELATIONSHIP(Structure): | |
| _fields_ = [ | |
| ("MaximumGroupCount", WORD), | |
| ("ActiveGroupCount", WORD), | |
| ("Reserved", BYTE*20), | |
| ("GroupInfo", PROCESSOR_GROUP_INFO*ANYSIZE_ARRAY), | |
| ] | |
| PGROUP_RELATIONSHIP = POINTER(GROUP_RELATIONSHIP) | |
| class SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_DUMMYUNIONNAME(Union): | |
| _fields_ = [ | |
| ("Processor", PROCESSOR_RELATIONSHIP), | |
| ("NumaNode", NUMA_NODE_RELATIONSHIP), | |
| ("Cache", CACHE_RELATIONSHIP), | |
| ("Group", GROUP_RELATIONSHIP), | |
| ] | |
| class SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX(Structure): | |
| _anonymous_ = ["Dummy"] | |
| _fields_ = [ | |
| ("Relationship", LOGICAL_PROCESSOR_RELATIONSHIP), | |
| ("Size", DWORD), | |
| ("Dummy", SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX_DUMMYUNIONNAME), | |
| ] | |
| PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX = POINTER(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX) | |
| GetLogicalProcessorInformationEx = windll.kernel32.GetLogicalProcessorInformationEx | |
| GetLogicalProcessorInformationEx.argtypes = [LOGICAL_PROCESSOR_RELATIONSHIP, PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX, PDWORD] | |
| GetLogicalProcessorInformationEx.restype = BOOL | |
| def cpu_count_phys(): | |
| '''https://github.com/microsoft/verona/blob/7fc162d48ddfa4caa702697e87fe7cfeaa1bb02c/src/rt/sched/cpu.h#L234''' | |
| relation = LOGICAL_PROCESSOR_RELATIONSHIP.RelationProcessorCore | |
| ptr = PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX() | |
| length = DWORD(0) | |
| if GetLogicalProcessorInformationEx(relation, ptr, byref(length)): | |
| return None | |
| if GetLastError() != ERROR_INSUFFICIENT_BUFFER: | |
| return None | |
| buffer = create_string_buffer(length.value) | |
| ptr = cast(buffer, PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX) | |
| if not GetLogicalProcessorInformationEx(relation, ptr, byref(length)): | |
| return None | |
| count = 0 | |
| remain = length.value | |
| while remain > 0: | |
| remain -= ptr.contents.Size | |
| ptr = cast(byref(ptr.contents, ptr.contents.Size), PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX) | |
| count += 1 | |
| return count |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment