Skip to content

Instantly share code, notes, and snippets.

@jean-helsinki
Created June 3, 2015 12:24
Show Gist options
  • Select an option

  • Save jean-helsinki/85b3ea97b7445abdd9a7 to your computer and use it in GitHub Desktop.

Select an option

Save jean-helsinki/85b3ea97b7445abdd9a7 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
"""
Based on:
https://gist.github.com/meeuw/c3bc9dd07945c87c89e6#file-findfiles-py
https://bitbucket.org/nosklo/pysmbclient/wiki/Home
"""
import os
import pexpect
import re
import locale
import datetime
import time
import logging
__logger__ = logging.getLogger(os.path.basename(__file__))
try:
datetime_strptime = datetime.datetime.strptime
except AttributeError:
# python version older than 2.5
def datetime_strptime(date, format):
return datetime.datetime(*(time.strptime(date, format)[:6]))
_volume_re = re.compile(r"""
Volume:\s # label
\|([^|]*)\|\s # the volume name
serial\snumber\s # another label
0x([a-f0-9]+) # hex serial number
$ # end of line
""", re.VERBOSE)
_smb_header_re = re.compile(r"""
Domain=\[([^]]+)\]\s
OS=\[([^]]+)\]\s
Server=\[([^]]+)\]
$
""", re.VERBOSE)
_file_re = re.compile(r"""
\s* # file lines start with some spaces
(.*?)\s+ # capture filename non-greedy, eating remaining spaces
([ADHSR]*) # capture file mode
\s+ # after the mode you can have any number of spaces
(\d+) # file size
\s+ # spaces after file size
( # begin date capturing
\w{3} # abbrev weekday
\s # space
\w{3} # abbrev month
\s{1,2} # one or two spaces before the day
\d{1,2} # day
\s # a space before the time
\d{2}:\d{2}:\d{2} # time
\s # space
\d{4} # year
) # end date capturing
$ # end of string""", re.VERBOSE)
_cwd_re = re.compile(ur"Current directory is \\\\(?P<host>[^\\]+)\\((?P<share>[^\\]+))(?P<rel_path>.+)")
_SMB_PROMPT = ur'smb:\s.*\>'
def _to_smb_path_fmt(path):
return path.replace(os.sep, '\\')
def _to_local_path_fmt(path):
return path.replace('\\', os.sep)
class SambaClientError(OSError): pass
class SambaClient():
def __init__(self, host, share, username, domain=".", password=""):
smb_cmd = 'smbclient -W {domain} -U "{username}" //{host}/{share}/'.format(**locals())
self.username = username
self.domain = domain
self.host = host
self.share = share
self.p = pexpect.spawn(smb_cmd, )
self._put_pwd(password)
self._runcmd_error_on_data("prompt ON")
def _put_pwd(self, pwd):
self.p.expect(ur'(?i).*password:')
self.p.sendline(pwd)
self.p.expect(_SMB_PROMPT)
def _runcmd(self, command=None, *args):
if command:
f_args = " ".join('"%s"' % arg for arg in args)
full_cmd = command + " " + f_args
__logger__.debug("SMBCLIENT CMD: {0}".format(full_cmd))
self.p.sendline(full_cmd)
self.p.expect(_SMB_PROMPT)
raw_out = self.p.before.decode('utf-8')
return raw_out.replace(full_cmd, "", 1).strip()
def _runcmd_error_on_data(self, cmd, *args):
"""raises SambaClientError if cmd returns any data"""
def term_bs_in_out(s):
return " \b" in s
data = self._runcmd(cmd, *args)
if data and not term_bs_in_out(data):
raise SambaClientError("Error on %r: %r" % (cmd, data))
return data
def lsdir(self, path):
"""
Lists a directory
returns a list of tuples in the format:
[(filename, modes, size, date), ...]
"""
path = os.path.join(path, u'*')
return self.glob(path)
def glob(self, path):
"""
Lists a glob (example: "/files/somefile.*")
returns a list of tuples in the format:
[(filename, modes, size, date), ...]
"""
files = self._runcmd(u'ls', path).splitlines()
for filedata in files:
m = _file_re.match(filedata)
if m:
name, modes, size, date = m.groups()
if name == '.' or name == '..':
continue
size = int(size)
# Resets locale to "C" to parse english date properly
# (non thread-safe code)
loc = locale.getlocale(locale.LC_TIME)
locale.setlocale(locale.LC_TIME, 'C')
date = datetime_strptime(date, '%a %b %d %H:%M:%S %Y')
locale.setlocale(locale.LC_TIME, loc)
yield (name, modes, size, date)
def listdir(self, path):
"""Emulates os.listdir()"""
result = [f[0] for f in self.lsdir(path)]
if not result: # can mean both that the dir is empty or not found
# disambiguation: verifies if the path doesn't exist. Let the error
# raised by _getfile propagate in that case.
self._getfile(path)
return result
def _getfile(self, path):
try:
f = self.glob(path).next()
except StopIteration:
raise SambaClientError('Path not found: %r' % path)
return f
def info(self, path):
"""Fetches information about a file"""
path = _to_smb_path_fmt(path)
data = self._runcmd(u'allinfo', path)
if data.startswith('ERRSRV'):
raise SambaClientError(
'Error retrieving info for %r: %r' % (path, data.strip()))
result = {}
for info in data.splitlines():
k, sep, v = info.partition(':')
if sep:
result[k.strip()] = v.strip()
return result
def diskinfo(self):
"""Fetches information about a volume"""
data = self._runcmd('volume')
for line in data.splitlines():
m = _volume_re.match(line)
if m:
name, serial = m.groups()
return name, int(serial, 16)
else:
raise SambaClientError(
'Error retrieving disk information: %r' % data)
def volume(self):
"""Fetches the volume name"""
return self.diskinfo()[0]
def serial(self):
"""Fetches the volume serial"""
return self.diskinfo()[1]
def isdir(self, path):
"""Returns True if path is a directory/folder"""
return 'D' in self._getfile(path)[1]
def isfile(self, path):
"""Returns True if path is a regular file"""
return not self.isdir(path)
def exists(self, path):
"""Returns True if path exists in the remote host"""
try:
self._getfile(path)
except SambaClientError:
return False
else:
return True
def mkdir(self, path):
"""Creates a new folder remotely"""
path = _to_smb_path_fmt(path)
self._runcmd_error_on_data(u'mkdir', path)
def cd(self, path):
"""Change remote dir"""
path = _to_smb_path_fmt(path)
self._runcmd_error_on_data(u'cd', path)
def lcd(self, path):
"""Change local dir"""
self._runcmd_error_on_data(u'lcd', path)
def rmdir(self, path):
"""Removes a remote empty folder"""
path = _to_smb_path_fmt(path)
self._runcmd_error_on_data(u'rmdir', path)
def unlink(self, path):
"""Removes/deletes/unlinks a file or folder"""
path = _to_smb_path_fmt(path)
if self.isdir(path):
try:
self.set_recurse()
self._runcmd_error_on_data(u'del', path + "\\*")
self.rmdir(path)
finally:
self.set_recurse(False)
else:
self._runcmd_error_on_data(u'del', path)
remove = unlink
def chmod(self, path, *modes):
"""Set/reset file modes
Tested with: AHS
smbc.chmod('/file.txt', '+H')
"""
path = _to_smb_path_fmt(path)
plus_modes = []
minus_modes = []
for mode in modes:
if mode.startswith(u'-'):
minus_modes.append(mode.lstrip(u'-'))
else:
plus_modes.append(mode.lstrip(u'+'))
modes = []
if plus_modes:
modes.append(u'+%s' % u''.join(plus_modes))
if minus_modes:
modes.append(u'-%s' % u''.join(minus_modes))
self._runcmd_error_on_data(u'setmode', u''.join(modes))
def rename(self, old_name, new_name):
old_name = _to_smb_path_fmt(old_name)
new_name = _to_smb_path_fmt(new_name)
self._runcmd_error_on_data(u'rename', old_name, new_name)
def download_file(self, remote_path, local_path):
remote_path = _to_smb_path_fmt(remote_path)
result = self._runcmd(u'get', remote_path, local_path)
def upload_file(self, local_path, remote_path):
remote_path = _to_smb_path_fmt(remote_path)
result = self._runcmd(u'put', local_path, remote_path)
def upload_update(self, local_path, remote_path):
remote_path = _to_smb_path_fmt(remote_path)
result = self._runcmd(u'reput', local_path, remote_path)
def walk(self, top, topdown=True):
names = self.glob(os.path.join(top, "*"))
dirs, nondirs = [], []
for item in names:
if 'D' in item[1]:
dirs.append(item[0])
else:
nondirs.append(item[0])
if topdown:
yield top, dirs, nondirs
for name in dirs:
new_path = os.path.join(top, name)
for x in self.walk(new_path, topdown):
yield x
if not topdown:
yield top, dirs, nondirs
def upload(self, local_path, remote_path):
if os.path.isdir(local_path):
if not self.exists(remote_path):
self.makedirs(remote_path)
for root, dirs, files in os.walk(local_path):
remote_root = root.replace(local_path, _to_local_path_fmt(remote_path))
for d in dirs:
rem_dir = _to_smb_path_fmt(os.path.join(remote_root, d))
if not self.exists(rem_dir):
self.makedirs(rem_dir)
for f in files:
rem_f = _to_smb_path_fmt(os.path.join(remote_root, f))
self.upload_file(os.path.join(root, f), rem_f)
else:
basedir = os.path.dirname(remote_path)
if not self.exists(basedir):
self.makedirs(basedir)
self.upload_file(local_path, remote_path)
def download(self, remote_path, local_path):
if self.isdir(remote_path):
if not os.path.exists(local_path):
os.makedirs(local_path)
for root, dirs, files in self.walk(remote_path):
local_root = root.replace(_to_local_path_fmt(remote_path), local_path)
for d in dirs:
loc_dir = os.path.join(local_root, d)
if not os.path.exists(loc_dir):
os.makedirs(loc_dir)
for f in files:
loc_f = os.path.join(local_root, f)
self.download_file(_to_smb_path_fmt(os.path.join(root, f)), loc_f)
else:
basedir = os.path.dirname(local_path)
if not os.path.exists(basedir):
self.makedirs(basedir)
self.download_file(remote_path, local_path)
def get_cwd(self):
out = self._runcmd(u'pwd').strip()
m = _cwd_re.search(out)
if m:
return m.groupdict()["rel_path"]
else:
raise SambaClientError("pwd command fail")
def makedirs(self, remote_path):
remote_path = _to_smb_path_fmt(remote_path)
dir_items = remote_path.split("\\")
for i in xrange(len(dir_items)):
part_path = "\\".join(dir_items[:i+1])
if not self.exists(part_path):
self.mkdir(part_path)
def set_recurse(self, recurse=True):
cwd = self.get_cwd()
try:
if cwd != "\\": self.cd("\\")
flag = "ON" if recurse else "OFF"
self._runcmd_error_on_data("recurse {0}".format(flag))
finally:
if cwd != "\\": self.cd(cwd)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self.close()
def __repr__(self):
return '<SambaClient({self.domain}\\{self.username}@//{self.host}/{self.share})>'.format(self=self)
def close(self):
pass
@meeuw
Copy link
Copy Markdown

meeuw commented Aug 31, 2015

nice! thanks for sharing!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment