可読性を保ちつつデータをエスケープするおもちゃ

概要

CSVデータをsort,uniq,cut,awkだけで分析したいがそうはいかないことがある。
データ中にカンマが入っている場合、cutでは取得する列がずれる場合がある。さらに改行文字が入っている場合もうまくいかない。
その場合には、適切なCSVライブラリを使ってエスケープ・エスケープ解除をしなければならない。

データを全てbase64などでエンコードしてしまえば、改行文字やカンマなどの危険な文字は入ってこない。
ただし、その場合は人の可読性が全くなくなる。
可読性をそこそこに保ったまま危険な文字をエスケープする方法として、Quoted-printableを使うことが考えられる。
https://ja.wikipedia.org/wiki/Quoted-printable
これは、asciiの英数字と一部の記号はそのまま表示するが、ascii制御記号などの1バイトを、"=0A"などイコールの後に十六進数2文字で符号化する方式である。

ただし、日本語文を含むデータでQuoted-printableによるエスケープを行うと、日本語部分は符号化されて読めなくなる。
UTF8の漢字・ひらがな・カタカナはデータ中に含まれていてもsort,uniq,cut,awkするのにほとんど問題はない(※1,2)ため、できれば漢字・ひらがな・カタカナもそのまま残しておきたい。
符号化しない漢字の基準としては、とりあえずJIS第1第2水準漢字とする。
まとめると、以下のようなエスケープルールを作りたい。

  • 符号化しないバイト
    • アルファベット
    • 数字
    • 比較的安全なascii記号
    • JIS X 0208に含まれる文字に該当するバイト列 (UTF-8であることが前提)
  • 符号化するバイト
    • 「符号化しないバイト」以外全て
    • 例:ascii制御記号(タブ、CR、LFなど)、比較的危険なascii記号(カンマ、ダブルクオートなど) 、私が読めない文字(アラビア文字など)

試しにpythonで作った。パフォーマンスはかなり悪い。

(※1)UTF8では、asciiの1バイト文字では先頭ビットが0、漢字のようなマルチバイト文字では各バイトの先頭ビットが全て1になる。そのためマルチバイト文字中の1バイトをascii文字として解釈することはない。
https://ja.wikipedia.org/wiki/UTF-8

(※2)マルチバイト文字を含むテキストでsort,uniqするときには環境変数LC_ALL=Cを設定しておいたほうがいい.
https://linuxjm.osdn.jp/html/GNU_coreutils/man1/sort.1.html
google:sort uniq LC_ALL=C

実行サンプル

準備

まずはunicode.orgからJIS X 0208の文字一覧を取得してきて、扱いやすくするためJSONに変換する。

$ wget http://unicode.org/Public/MAPPINGS/OBSOLETE/EASTASIA/JIS/JIS0208.TXT
$ python create_jisx_json.py > jisx0208.json

サンプルデータとして、wikipediaのHyperText_Markup_Languageのページ( https://ja.wikipedia.org/wiki/HyperText_Markup_Language )からHTMLを用意。

<!DOCTYPE html>
<html lang="ja">
 <head>
  <meta charset="UTF-8">
  <link rel="author" href="mailto:mail@example.com">
  <title lang="en">HyperText Markup Language - Wikipedia</title>
 </head>
 <body>
  <article>
   <h1 lang="en">HyperText Markup Language</h1>
   <p>HTMLは、<a href="http://ja.wikipedia.org/wiki/SGML">SGML</a>
      アプリケーションの一つで、ハイパーテキストを利用してワールド
      ワイドウェブ上で情報を発信するために作られ、
      ワールドワイドウェブの<strong>基幹的役割</strong>をなしている。
      情報を発信するための文書構造を定義するために使われ、
      ある程度機械が理解可能な言語で、
      写真の埋め込みや、フォームの作成、
      ハイパーテキストによるHTML間の連携が可能である。</p>
  </article>
 </body>
</html>
エンコード

改行文字を含むascii制御文字は全てエスケープするため、確実に一行で表すことができる。

$ cat sample.html |python jisquopri_encode.py
<!DOCTYPE=20html>=0A<html=20lang=3D=22ja=22>=0A=20<head>=0A=20=20<meta=20charset=3D=22UTF-8=22>=0A=20=20<link=20rel=3D=22author=22=20href=3D=22mailto:mail@example.com=22>=0A=20=20<title=20lang=3D=22en=22>HyperText=20Markup=20Language=20-=20Wikipedia</title>=0A=20</head>=0A=20<body>=0A=20=20<article>=0A=20=20=20<h1=20lang=3D=22en=22>HyperText=20Markup=20Language</h1>=0A=20=20=20<p>HTMLは、<a=20href=3D=22http://ja.wikipedia.org/wiki/SGML=22>SGML</a>=0A=20=20=20=20=20=20アプリケーションの一つで、ハイパーテキストを利用してワールド=0A=20=20=20=20=20=20ワイドウェブ上で情報を発信するために作られ、=0A=20=20=20=20=20=20ワールドワイド ウェブの<strong>基幹的役割</strong>をなしている。=0A=20=20=20=20=20=20情報を発信するための文書構造を定義するために使われ、=0A=20=20=20=20=20=20ある程度機械が理解可能な言語で、=0A=20=20=20=20=20=20写真の埋め込み や、フォームの作成、=0A=20=20=20=20=20=20ハイパーテキストによるHTML間の連携が可能である。</p>=0A=20=20</article>=0A=20</body>=0A</html>=0A
エンコード(改行あり)

適当な位置で改行するオプションを使用(元データの改行位置とは関係ない)。
"DOCTYPE"や"アプリケーション"などの文字列はエスケープされないが、スペースやダブルクオートはエスケープされる。

$ cat sample.html |python jisquopri_encode.py --newline 40
<!DOCTYPE=20html>=0A<html=20lang=3D=22ja=22>=0A=20<head>
=0A=20=20<meta=20charset=3D=22UTF-8=22>=0A=20=20<link=20rel=3D=22a
uthor=22=20href=3D=22mailto:mail@example.com=22>=0A=20
=20<title=20lang=3D=22en=22>HyperText=20Markup=20Langu
age=20-=20Wikipedia</title>=0A=20</head>=0A=20<body>
=0A=20=20<article>=0A=20=20=20<h1=20lang=3D=22en=22>HyperText=20
Markup=20Language</h1>=0A=20=20=20<p>HTMLは、<a=20href
=3D=22http://ja.wikipedia.org/wiki/SGML=22>SGM
L</a>=0A=20=20=20=20=20=20アプリケーションの一つで、ハイパーテキストを利用してワー
ルド=0A=20=20=20=20=20=20ワイドウェブ上で情報を発信するために作られ、=0A=20=20=20=20=20=20ワー
ルドワイドウェブの<strong>基幹的役割</strong>をなしている。=0A=20
=20=20=20=20=20情報を発信するための文書構造を定義するために使われ、=0A=20=20=20=20=20=20ある
程度機械が理解可能な言語で、=0A=20=20=20=20=20=20写真の埋め込みや、フォームの作成、=0A=20
=20=20=20=20=20ハイパーテキストによるHTML間の連携が可能である。</p>=0A=20=20</
article>=0A=20</body>=0A</html>=0A
適当な圧縮データをエンコード

任意のバイト列をエンコードすることが可能。
ほとんどマルチバイト文字は現れないが、6行目にキリル文字のшが出現している。

$ cat create_jisx_json.py| gzip | python jisquopri_encode.py --newline 40
=1F=8B=08=00=22=C7=0DY=00=03=85R=B1n=C3=20=10=DD=F9=0AD=17=90=1C=E4z=A8=A2H^+=B5k3TJ=AC=88=1A=DC
=5CD=C0=82s=AB=AA=EA=BF=17l=27n=A6=DEb=B8w=EF=DD=F1=CEp=EE}@z=8A=DE=91=13D=0B=11iMw=0D=F9=04
<R=DF=1B=C7=D9=F3=D3KY=95k=B9}=DD=B2=82=B2=C0=04U=91v=1BBSt>P=0B=CEPp=B4=93=C1(=9Do=91
=8B=09=CE1=C2=F5=F8=91!b=80=9E=8B+=08=DD=08=EC=CA=86=D65ewl=E1=E5h=BDCp=83=B9&=B5=C2=8BV=EC-
=20g{d=8B=DC=E0=A0=F5=DA=1C@=A7*=F3=A1=2CO=84]=D5=FC=A9h=8F*$p=A9=94=E8=0Fo_=98=86=AE=0A=96$
=D1=1A&=A46=19=E6l=C0nu=FF=B0=CA=B9E=03=BB=F5D=C9f=8D=A7ш=E9=94=9C=18=9BH=E3=16=895=13=CD-=DF=1A
=F7=9E<N=8FI&/=8A7=D6=CC=C3&c=F6=FB=FF=9C=99=D7=27U=9F=B6=A6=F97=CB=5C=B6=994=D2=DEr=8B=B1C=CE
]=DB=CD=C04=CC=05=99n?=82=10=D2=07p=C8=F3=FF!=F5p=EE#=9F=DB=14=D4=B88=04sP=B1=05=A8=1F=95=8D=A6H
O=D7=C6a]=09A~=01*}=BF=A3Z=02=00=00
デコード

エンコードしたデータを元のバイト列に戻すことも可能

$ cat sample.html |python jisquopri_encode.py | python jisquopri_decode.py
<!DOCTYPE html>
<html lang="ja">
 <head>
  <meta charset="UTF-8">
  <link rel="author" href="mailto:mail@example.com">
  <title lang="en">HyperText Markup Language - Wikipedia</title>
 </head>
 <body>
  <article>
   <h1 lang="en">HyperText Markup Language</h1>
   <p>HTMLは、<a href="http://ja.wikipedia.org/wiki/SGML">SGML</a>
      アプリケーションの一つで、ハイパーテキストを利用してワールド
      ワイドウェブ上で情報を発信するために作られ、
      ワールドワイドウェブの<strong>基幹的役割</strong>をなしている。
      情報を発信するための文書構造を定義するために使われ、
      ある程度機械が理解可能な言語で、
      写真の埋め込みや、フォームの作成、
      ハイパーテキストによるHTML間の連携が可能である。</p>
  </article>
 </body>
</html>

コード(NYSLライセンス)

JISQuopri.py

import json

class JISQuopriEncode(object):
    CONTROLS = list(range(0x00, 0x20)) + [0x7f]
    MSBS = list(range(0x80, 0x100))
    ASCII_NOT_PRINTABLE = CONTROLS + MSBS

    BYTE_TO_BYTES = [i.to_bytes(1, "little") for i in range(256)]

    UTF8_3BYTE_MAX = 239
    UTF8_3BYTE_MIN = 224

    UTF8_2BYTE_MAX = 223
    UTF8_2BYTE_MIN = 192

    TO_HEXBYTES = [
        b"0",b"1",b"2",b"3",b"4",b"5",b"6",b"7",b"8",b"9",b"A",b"B",b"C",b"D",b"E",b"F"
    ]
    JISX_JSON_PATH = "jisx0208.json"

    def __init__(self, escape_char=b'=', escape_ascii_chars=[], escape_multibyte_chars=[], newline=None, ascii_only=False):
        self.escape_char = escape_char
        self.escape_byte = escape_char[0]
        self.ascii_only = ascii_only

        if isinstance(newline, int) and newline > 0:
            self.newline = newline
        else:
            self.newline = None

        self.create_table(escape_ascii_chars)
        if not ascii_only:
            self.create_multibyte_table(escape_multibyte_chars)


    def create_table(self, escape_target_src):
        escape_target = []
        for t in escape_target_src:
            if isinstance(t, str):
                escape_target.append(t.encode("ascii")[0])
            elif isinstance(t, bytes):
                escape_target.append(t[0])
            else:
                escape_target.append(t)
        escape_target.append(self.escape_byte)

        byte_to_encoded = [b""]*256
        for i in range(256):
            if i in escape_target:
                first = int(i / 16)
                second = i % 16
                val = self.escape_char + self.TO_HEXBYTES[first] + self.TO_HEXBYTES[second]
            else:
                val = self.BYTE_TO_BYTES[i]
            byte_to_encoded[i] = val

        self._tbl = byte_to_encoded

    def create_multibyte_table(self, escape_target_src):
        with open(self.JISX_JSON_PATH, "r") as f:
            jisx_list = json.load(f)

        self._mtbl = {}
        for char in jisx_list:
            if char["char"] in escape_target_src:
                continue

            if char["utf8length"] == 3:
                idx = char["utf8bytes"][0] * 65536 + char["utf8bytes"][1] * 256 + char["utf8bytes"][2]
            elif char["utf8length"] == 2:
                idx = char["utf8bytes"][0] * 65536 + char["utf8bytes"][1] * 256
            else:
                raise Exception

            self._mtbl[idx] = b"".join([self.BYTE_TO_BYTES[b] for b in char["utf8bytes"]])


    def encode(self, bytes_data):
        if self.ascii_only:
            return self._encode_onebyte(bytes_data)
        else:
            return self._encode_multibyte(bytes_data)

    def _encode_onebyte(self, bytes_data):
        result = []
        line_length = 0
        for b in bytes_data:
            result.append(self._tbl[b])
            if self.newline:
                line_length += 1
                if line_length >= self.newline:
                    result.append(b"\n")
                    line_length = 0

        if len(result) > 0 and result[-1] == b"\n":
            result = result[:-1]

        return b"".join(result)

    def _encode_multibyte(self, bytes_data):
        result = []
        line_length = 0

        len_bytes = len(bytes_data)
        i = 0
        while(i < len_bytes):
            b = bytes_data[i]
            success = False
            if (b >= self.UTF8_3BYTE_MIN) and (b <= self.UTF8_3BYTE_MAX) and (i + 2 < len_bytes):
                idx = b * 65536 + bytes_data[i+1] * 256 + bytes_data[i+2]
                val = self._mtbl.get(idx)
                if val is not None:
                    success = True
                    result.append(val)
                    i += 3
            if (not success) and (b >= self.UTF8_2BYTE_MIN) and (b <= self.UTF8_2BYTE_MAX) and (i + 1 < len_bytes):
                idx = b * 65536 + bytes_data[i+1] * 256
                val = self._mtbl.get(idx)
                if val is not None:
                    success = True
                    result.append(val)
                    i += 2
            if (not success):
                result.append(self._tbl[b])
                i += 1

            if self.newline:
                line_length += 1
                if line_length >= self.newline:
                    result.append(b"\n")
                    line_length = 0

        if len(result) > 0 and result[-1] == b"\n":
            result = result[:-1]

        return b"".join(result)

class JISQuopriDecode(object):
    CONTROLS = list(range(0x00, 0x20)) + [0x7f]
    BYTE_TO_BYTES = [i.to_bytes(1, "little") for i in range(256)]
    FROM_HEXBYTES = [None]*48 + [0,1,2,3,4,5,6,7,8,9] + [None]*7 + [10,11,12,13,14,15]

    def __init__(self, escape_char=b'='):
        self.escape_char = escape_char
        self.escape_byte = escape_char[0]

        self.is_control_tbl = [(i in self.CONTROLS) for i in range(256)]
        self.decode_state = 0

    def decode(self, bytes_data):
        result = []
        for b in bytes_data:
            if self.is_control_tbl[b]:
                continue
            elif self.decode_state == 1:
                self.decode_first_4bit = self.FROM_HEXBYTES[b] * 16
                self.decode_state = 2
            elif self.decode_state == 2:
                result.append(
                    self.BYTE_TO_BYTES[self.decode_first_4bit + self.FROM_HEXBYTES[b]]
                )
                self.decode_state = 0
            elif b == self.escape_byte:
                self.decode_state = 1
            else:
                result.append(self.BYTE_TO_BYTES[b])
        return b"".join(result)

jisquopri_encode.py

import sys
import argparse
from JISQuopri import JISQuopriEncode

if __name__ == "__main__":
    BUFSIZE = 1024

    parser = argparse.ArgumentParser()
    parser.add_argument('--ascii_only', '-a', dest='ascii_only', action='store_true')
    parser.add_argument('--newline', '-n', type=int, default=None, dest='newline')
    parser.set_defaults(ascii_only=False)

    args = parser.parse_args()

    if args.ascii_only:
        output_encoding = "ascii"
        escape_ascii_chars = JISQuopriEncode.ASCII_NOT_PRINTABLE + ['\\', '"', "'", ",", " "] # 一部のascii記号を追加でエスケープする
        escape_multibyte_chars = []
    else:
        output_encoding = "utf-8"
        escape_ascii_chars = JISQuopriEncode.ASCII_NOT_PRINTABLE + ['\\', '"', "'", ",", " "]
        escape_multibyte_chars = [" "] # 全角スペースはエスケープする
    jqe = JISQuopriEncode(escape_ascii_chars=escape_ascii_chars,
                          escape_multibyte_chars=escape_multibyte_chars,
                          newline=args.newline,
                          ascii_only=args.ascii_only)

    while True:
        dat = sys.stdin.buffer.read(BUFSIZE)
        if args.newline:
            sys.stdout.write(jqe.encode(dat).decode(output_encoding) + "\n")
        else:
            sys.stdout.write(jqe.encode(dat).decode(output_encoding))
        if len(dat) < BUFSIZE:
            break

    if not args.newline:
        print("")

jisquopri_decode.py

import sys
from JISQuopri import JISQuopriDecode

if __name__ == "__main__":
    BUFSIZE = 1024

    jqd = JISQuopriDecode()
    while True:
        dat = sys.stdin.buffer.read(BUFSIZE)
        sys.stdout.buffer.write(jqd.decode(dat))
        if len(dat) < BUFSIZE:
            break

create_jisx_json.py

import json
jislist = []
with open("JIS0208.TXT", "r") as f:
    for line in f.readlines():
        line = line.rstrip()
        if line[0] == "#":
            continue
        dat = line.split("\t")
        unicode_id = eval(dat[2])
        uchar = unicode_id.to_bytes(2,"little").decode("utf-16-le")
        utf8_bytes = [_byte for _byte in uchar.encode("utf-8")]
        utf8_length = len(utf8_bytes)
        if uchar == "\\":
            continue
        jislist.append({"char": uchar, "utf8bytes": utf8_bytes, "utf8length": utf8_length})


print(json.dumps(jislist, ensure_ascii=False, indent=2))