今まで作ったPython, Ruby, Bashの関数を色々晒してみる
華金なのに飲みもないので、ブログでも書こう。今日は、今までに書いた関数をいくつも晒してみようと思う。RubyとかPythonとかbashとか必要に応じて色々作った関数もそこそこあるので、誰の役に立つかわからないけど、晒してみる。 importとかまでは記してないし、コピペしただけでは動かないのが大半なので、もし利用する場合はググるなり、Twitterでメンション飛ばすなりしてください。
Python
バージョンチェック
最近書くスクリプトではこのチェック必ず入れてる。自分以外の人が使うときに、そこでハマってほしくないので。
def check_python_version(): u""" 実行されているPythonのバージョンが許可したバージョンかの確認を行います。 """ python_version = __get_python_version() expected_python_version = Constants.get_python_version() from hogehoge import log log.output( "REAL PYTHON VERSION: %s, EXPECTED PYTHON VERSION: %s" % (python_version, expected_python_version) ) if python_version != expected_python_version: raise Exception('このPythonバージョンでの実行は許可されていません') def __get_python_version(): u""" 実行されているPythonのバージョンを取得します。 """ __python_version = '%i.%i.%i' % ( sys.version_info[0], sys.version_info[1], sys.version_info[2]) return __python_version
ユーザ周り
バージョンチェック同様、ユーザのバリデーションも欠かせない。
def __get_exec_user(): u""" スクリプトの実行ユーザを返します。 """ return pwd.getpwuid(os.getuid())[0] def check_exec_user(): u""" 実行ユーザのバリデーションをします。 """ exec_user = __get_exec_user() expected_exec_user = Constants.get_exec_user() from hogehoge import log log.output( "REAL EXEC USER: %s, EXPECTED EXEC USER: %s" % (exec_user, expected_exec_user) ) if exec_user != expected_exec_user: raise Exception('このユーザでの実行は許可されていません')
ロック
重複実行は避けたいので。コマンドラインツールとかだと使う。
def create_lock_file(): u""" ロックファイルを作成します。 既にロックファイルが存在する場合は例外をスローします。 """ __lock_file_path = __get_lock_dir_path() + __get_lock_file_name() __check_duplicate_execution() try: # ファイル作成 log.output('create lock file: ' + __lock_file_path, 'info') fp = open(__lock_file_path, "w") except Exception as inst: print(type(inst)) log.output('READ ERROR', 'warning') finally: fp.close def delete_lock_file(): u""" ロックファイルを削除します。 """ __lock_file_path = __get_lock_dir_path() + __get_lock_file_name() log.output('delete lock file: ' + __lock_file_path, 'info') if os.path.exists(__lock_file_path): os.remove(__lock_file_path) def __get_lock_file_name(): u""" ロックファイルの名前を取得します。 """ __env = retrieve_env() return CONSTANTS[__env]['lock']['FILE_NAME'] def __create_lock_dir(): u""" ロックファイル用ディレクトリを作成します。 該当ディレクトリ存在しない場合のみ、ディレクトリを作成します。 """ __lock_file_dir = __get_lock_dir_path() if not os.path.isdir(__lock_file_dir): os.mkdir(__lock_file_dir) # os.makedirs(LOCK_FILE_DIR) def __get_lock_dir_path(): u""" ロックファイル用ディレクトリのパスを取得します。 ディレクトリが指定されていない場合は、bin/ディレクトリを返すようにしています。 """ __env = retrieve_env() __lock_file_dir = CONSTANTS[__env]['lock']['FILE_DIR'] __bin_dir = CONSTANTS[__env]['apps']['BIN_DIR'] # 変数が空の場合は、binディレクトリ指定して返す if not __lock_file_dir: __lock_file_dir = get_home_dir(__file__) + '/' + __bin_dir + '/' return __lock_file_dir def __check_duplicate_execution(): u""" 二重実行されていないかのチェックを行います。 ロックファイルが既に作成されている場合は、二重実行されていると判断し、例外をスローします。 """ __lock_file_path = __get_lock_dir_path() + __get_lock_file_name() # 存在確認 log.output('check lock file existence: ' + __lock_file_path, 'info') if os.path.exists(__lock_file_path): raise Exception('すでにロックファイルが存在します')
ホームディレクトリを返す
テストでアプリのコードをimportする際にパス通すときに使う。それ以外では特に使わない。
def get_home_dir(file): u""" アプリケーションのホームディレクトリを返します。 """ return os.path.split(os.path.dirname(os.path.abspath(file)))[0]
環境変数取得
環境変数に応じて設定値を変えたいときなどに使う。Rubyだとsettingslogicとかあるのに、Pythonではどうも見つからないので、これが欠かせない。
def retrieve_env(): u""" 環境変数を参照し、スクリプト実行環境を判別します。 実行時デフォルトはdefaultを返すようになっています 「SCRIPT_ENV=production hogehoge.py」のように実行することで、 環境に応じたコンフィグを得ることができます。 """ __env = 'DEFAULT' if 'SCRIPT_ENV' in os.environ: __env = os.environ['SCRIPT_ENV'] return __env
ファイル・ディレクトリ周り
ラップしただけだけど。
def is_exist_file(filename): u""" ファイルが存在するかの確認をします。 """ return os.path.exists(filename) def is_exist_dir(path): u""" ディレクトリが存在するかの確認をします。 """ return os.path.isdir(path) def create_dir(path): u""" ディレクトリを作成します。 """ return os.makedirs(path) def delete_recursive_dir(path): u""" ディレクトリを再帰的に削除します。 """ return shutil.rmtree(path) def find_latest_created_dir(target_path): u""" 対象ディレクトリのなかで、 最も最近に作られたディレクトリをフルパスで返します。 """ import glob __files = glob.glob(target_path) __files.sort(key=os.path.getmtime, reverse=True) return __files[0]
OSコマンド
こういうのはあったほうが便利ですな。ちょくちょく使う。
def exec_command(cmd='hostname'): u""" OSのコマンドを実行します。 例外はキャッチしていないため、必要に応じて上位でキャッチしてください。 """ subproc_args = {'stdin': None, 'stdout': subprocess.PIPE, 'stderr': subprocess.PIPE, 'close_fds': True} p = subprocess.Popen(cmd.strip().split(" "), **subproc_args) stdout_data, stderr_data = p.communicate() return p.returncode, stdout_data, stderr_data
SSHクライアント
昔書いた。便利で、状況に応じて使う。
class Ssh: def __init__(self, user_name, remote_host, port, private_key, timeout): self.user = user_name self.host = remote_host self.port = port self.private_key = private_key self.timeout = timeout def _command_execute(self, command): u""" リモートホストに対し、コマンドを実行します。 paramikoモジュール内でエラーハンドリングができなかったため、 exec_command時に標準エラー出力があれば、例外をスローしています。 """ from hogehoge import log log.output('Command Execute: ' + command, 'debug') try: self.__create_connection() stdin, stdout, stderr = self.client.exec_command(command) stdout_msg = stdout.read() stderr_msg = stderr.read() # エラー出力があれば、例外スロー if stderr_msg: stderr_decoded = stderr_msg.decode(sys.stdout.encoding) log.output(stderr_decoded, 'warning') raise Exception stdout_decoded = stdout_msg.decode(sys.stdout.encoding) return stdout_decoded except paramiko.SSHException as e: log.output("Password is invalid:" + str(e)) except paramiko.AuthenticationException as e: log.output("We had an authentication exception!" + str(e)) except Exception as e: log.output("### SSH Client Exception Catched! ###" + str(e)) def __create_connection(self): u""" コネクションを作成します。 """ self.conn = self.client.connect( hostname=self.host, port=self.port, username=self.user, key_filename=self.private_key, timeout=self.timeout ) return self.conn def __enter__(self): u""" オブジェクト生成時にSSHクライアントを生成します。 """ self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) return self def __exit__(self, exec_type, exec_value, traceback): self.client.close()
rsync
paramikoが再帰的にSCPしてくれなかったので、書いた。
def upload_dir(host, local_dir, remote_dir): u""" 対象ホストにディレクトリをアップロードします。 """ remote = '{}:{}'.format(host, remote_dir) (__exit_status, __stdout, __stderr) = exec_command('rsync -av {} {}'.format(local_dir, remote)) return __exit_status, __stdout.decode('utf-8'), __stderr.decode('utf-8')
時刻周り
あったほうが便利かな。
def get_date(): u""" 日時を取得します。 """ d = datetime.datetime.today() return d.strftime("%Y%m%d") def get_time(): u""" 時刻を取得します。 """ d = datetime.datetime.today() return d.strftime("%H%M%S")
HTTP
これもたまに使う。
def send_get_request(server, path, header, timeout): url = PROTOCOL_PREFIX + server + path r = requests.get(url, headers=header, timeout=timeout) status_code = r.status_code return status_code def send_put_request(server, path, header, timeout, data): url = PROTOCOL_PREFIX + server + path r = requests.put(url, data=data, headers=header, timeout=timeout) status_code = r.status_code return status_code def check_status_code(method, code): if method == 'get': flg = True if code == 200 else False elif method == 'put': flg = True if (code in [201, 204]) else False elif method == 'post': # For "private api test" flg = True if (code in [400]) else False else: sys.exit(1) return flg
メール
これはごくまれに使う。今は通知部分はJenkinsとかでやるし、そこまで使わないかも。
class Mail: def _create_connection(self): __conn = smtplib.SMTP(MAIL_HOST) #__conn.set_debuglevel(True) return __conn def __create_message(self, to, mail_subject, mail_body): __message = {} __message = MIMEText(mail_body) __message['From'] = MAIL_FROM __message['To'] = to __message['Subject'] = mail_subject return __message def send_mail(self, to, mail_subject, mail_body): __conn = self._create_connection() __message = self.__create_message(to, mail_subject, mail_body) __conn.send_message(__message) __conn.close()
SVN周り
昔必要になり書いた。SVNのコミットデータをうまく抽出するって地味に面倒だったりする。
def get_commit_file_name(commit_file_line): u""" """ __commit_file_info = __extract_commit_file_name(commit_file_line) return __commit_file_info def __extract_commit_file_name(commit_file_line): u""" """ __pattern = r'(Adding|Sending)\s+(.*)' regex = re.compile(__pattern, re.MULTILINE) for match in regex.finditer(commit_file_line): return match.group(2) def collect_commited_files(log_file_path): __commited_files = [] #print('this is log path: %s' % log_file_path) for line in open(log_file_path, 'r'): __commited_file_name = get_commit_file_name(line) # マッチしたもののみ出力 if __commited_file_name: __commited_files.append(__commited_file_name) return __commited_files
bash
logging機能とかもう少し優秀だったらいいのにと思いつつ。
ログ
function append_log() { [[ "$#" -ne 1 ]] && echo "${FUNCNAME} 'an argment insufficiency'" && return 1 local msg="${1}" local now_time=$( date '+%Y%m%d_%H%M%S' ) eval 'echo "[${now_time}] ${msg}" 2>&1' | tee -a "${LOG_FILE}" }
ロック
Pythonのやつと同様、コマンドラインツールとかだと使うかな。こちらで想定した使い方しないケースもあったりするので、事前に重複実行は避けておきたい。
function create_lock_file() { touch "${LOCK_FILE}" local cmd_ret="${?}" if [ $? -eq 0 ]; then return "${cmd_ret}" else err_msg '[ERROR] create lock file FAILED!!' exit 1 fi } function delete_lock_file() { rm -rf "${LOCK_FILE}" local cmd_ret="${?}" if [ $? -eq 0 ]; then return "${cmd_ret}" else err_msg '[ERROR] delete lock file FAILED!!' exit 1 fi } function check_lock_file() { if [ -f "${LOCK_FILE}" ]; then err_msg "[ERROR] lock file FOUND. Dupilicate exec FAILED!![file_path: ${LOCK_FILE}]" exit 1 fi }
引数
コマンドライン引数チェックは自前ですな。
function check_opt_num() { [[ "$#" -ne 2 ]] && echo "${FUNCNAME} 'an argment insufficiency'" && return 1 local opt_num="${1}" local required_num="${2}" if [ "${opt_num}" -ne "${required_num}" ]; then echo '[ERROR] Insufficiency options.' usage exit 1 fi }
ユーザ
Python同様ユーザチェック。
function is_allow_exec_user() { [[ "$#" -ne 2 ]] && echo "${FUNCNAME} 'an argment insufficiency'" && return 1 local exec_user="${1}" local allow_user="${2}" if [ "${exec_user}" != "${allow_user}" ]; then err_msg "[ERROR] this user is not ALLOWED!!! [exec_user: ${exec_user}]" exit 1 else logger "[Success]: this user is allowed. [exec_user: ${exec_user}]" fi }
SSH
これも必要に応じて使う感じ。例外処理めんどいからあまりbashではやりたくないのだけれど、しょうがなかった。
function exec_cmd_with_ssh() { [[ "$#" -ne 3 ]] && echo "${FUNCNAME} 'an argment insufficiency'" && return 1 local user="${1}" local host="${2}" local cmd="${3}" ssh -n "${user}"@"${host}" "${cmd}" # Below if statemet is worked for 'shebang is [/bin/bash] only' # if you use -eu option, it does not work. local cmd_ret="${?}" if [ $cmd_ret -eq 0 ]; then return 0 else err_msg '[ERROR] "Invalid cmd return" was called...' return 1 fi }
メール
これも昔必要があって、作った。
function send_message { [[ "$#" -ne 3 ]] && echo "${FUNCNAME} 'an argment insufficiency'" && return 1 local mail_to="${1}" local mail_subject="${2}" local mail_content="${3}" eval 'echo "${mail_content}" | /bin/mail -s "${mail_subject}" "${mail_to}"' if [ "$?" -ne 0 ]; then echo '[ERROR] send mail FAILED!!' exit 1 fi }
Ruby
Jenkinsはchangelogから差分抽出できるのでよいですね。
Jenkins
def extract_commited_files_from_changelog() commit_files_list = [] changelog = "#{ENV['JENKINS_HOME']}/jobs/#{ENV['JOB_NAME']}/builds/#{ENV['BUILD_NUMBER']}/changelog.xml" begin fh = open(changelog,'r') fh.each do | line | if /^:/ =~ line # extract commited file_path commit_info = line.strip.split(/\s+/) if commit_info[5].nil? raise 'changelog parse error. cannot get filename correctly' else commit_files_list.push commit_info[5] end end end rescue Errno::ENOENT => e raise "[critical] cannot open: #{changelog}" exit(1) ensure fh.close end commit_files_list.sort.uniq end
HTTP
リトライ付き。あると便利。
class HttpClient def get_url(base_url, request_path) begin failed ||= 0 timeout(10) do response_status = Net::HTTP.get_response(base_url, request_path) validate_status_code(response_status.code, request_path) end rescue Timeout::Error puts "Timeout Occur! URL: #{base_url}#{request_path}" rescue RuntimeError failed += 1 puts "status code error: retry is started" retry if failed < 3 end end def validate_status_code(code, request_path) if code.to_i == 200 || code.to_i == 404 || code.to_i == 400 #puts "#{request_path}, #{code}" else raise "status code is NOT 200, but #{code}, #{request_path}" end end end
終わりに
こうやってみると、スタイルもバラバラだし、コマンドラインに特化したものばかりがどうしても多いなという印象。 例外とかコメントがもう少し綺麗に書けるといいなぁ。あとから読むとどうも分かりづらいので、がんばろう。