ツタンラーメンの忘備録

プログラミングや精神疾患、ラーメンについて書いていきます。たぶん。

ArduinoとpythonのSerial通信

基本形。ほんとに単純化した者。
pythonのコードを記載しておく。Arduinoは方位センサを使っている。

pip install pyserial

して

import serial
import re

def main():
    with serial.Serial('COM3',9600,timeout=1) as ser:
        while True:
            c = ser.readline()
            de = c.decode('utf-8') #バイトから文字として認識できる形に。
            m = re.match("\-*\d+", str(de))
            if(m != None):
                print(m.group())
            else:
                pass
                #print(type(m))
        ser.close()

if __name__ == "__main__":
    main()

ちなみに方位センサのコードは

#include <Wire.h>
#include "skHMC5883L.h"
#define SENSOR_ADRS     0x1E
skHMC5883L  Compass(SENSOR_ADRS);

void setup() {
  int ans;
  Serial.begin(9600);
  Wire.begin();
  ans = Compass.Begin() ;
  if (ans == 0) Serial.println("Initialization normal") ;
  else {
    Serial.print("Initialization abnormal ans=") ;
    Serial.println(ans) ;
  }
}

void loop() {
  int ans;
  float deg;
  ans = Compass.SingleRead(&deg,6.6);
  if(ans == 0){
    Serial.println(RowX);
    Serial.println(RowY);
    Serial.println(RowZ);
  }else Serial.println("NG");
  delay(1000);
}

別にサンプルコードのままだから特に何も。

pythonでLINEのreplyからuserIdを取得する あと timestampのdatetimeへの変更

単純なんだけどはまったので

event.source.userId

だと思っていたんだけど

event.source.user_id

なんだよね…。

あと現在時刻の取得。herokuでやっているから時差あるけど…。

import time
from datetime import datetime

#

print(datetime.fromtimestamp(event.timestamp/1000.0))

#

何も考えずにスクレイピングをpythonで始めてみた。

やってみたかったので始めました。王道かなと
scrapy
を入れてみたのですが別に使いやすくないなーと思いました。
入れ方はこちら
Installation guide — Scrapy 1.1.3 documentation

pip install scrapy

で入らなかったので詰んだかなと思ったのですが、上記手順で普通に入った。
あとは適当にごにょごにょ。

でも直感的じゃない。例えばサンプルコードは
Scrapy | A Fast and Powerful Scraping and Web Crawling Framework

class BlogSpider(scrapy.Spider):
    name = 'blogspider'
    start_urls = [url]

    def parse(self, response):
        for title in response.css('h2.entry-title'):
            yield {'title': title.css('a ::text').extract_first()}

        next_page = response.css('div.prev-post > a ::attr(href)').extract_first()
        if next_page:

実行コマンドも

scrapy runspider myspider.py

みたいな感じ。
コンソールにやたら出るし初心者には直感的じゃないよねぇ

というわけでbeautifulSoup4
PythonとBeautiful Soupでスクレイピング - Qiita

超簡単!
リンクが直接は得られないようなので
url.requestを僕は使いました(何度もpip installしようとしたのですが、標準搭載の当たり前の機能でした)

書いたコードがこれ

from bs4 import BeautifulSoup
import urllib.request
html = urllib.request.urlopen(url)

soup = BeautifulSoup(html)

print(soup.find_all("a")[0].get("href"))

find_allして最初の配列を取るとかいう意味不明なことしてますが許して。
楽しい!

chainerで音声認識

あとで整形します。
メインコード

from chainer import Link, Chain, ChainList, Variable
import chainer.functions as F
import chainer.links as L
import chainer
from chainer import training
from chainer.training import extensions
import numpy as np
import MFCC

class VoiceAnalysis(Chain):
    def __init__(self, n_units=64, n_out=1):
        super(VoiceAnalysis, self).__init__(
            l1 = L.Linear(None, n_units),
            l2 = L.Linear(None, n_units),
            l3 = L.Linear(None, n_out),
        )
    def __call__(self, x):
        h1 = F.sigmoid(self.l1(x))
        h2 = F.sigmoid(self.l2(h1))
        o = self.l3(h2)
        return o

def main():
    batchsize = 100
    epoch = 150
    frequency = -1
    out = 'result'
    unit = 1000

    model = L.Classifier(VoiceAnalysis(unit, 3))
    optimizer = chainer.optimizers.Adam()
    optimizer.setup(model)
    name_list = ["en_a", "ja_a", "no_voice"]
    train = MFCC.read_ceps(name_list)
    test = train
    train_iter = chainer.iterators.SerialIterator(train, batchsize)
    test_iter = chainer.iterators.SerialIterator(test, batchsize, repeat=False, shuffle=False)

    updater = training.StandardUpdater(train_iter, optimizer)
    #print(updater)
    trainer = training.Trainer(updater, (epoch, 'epoch'), out)
    #print(trainer)
    trainer.extend(extensions.Evaluator(test_iter, model))
    trainer.extend(extensions.LogReport())

    trainer.extend(extensions.PrintReport(
        ['epoch', 'main/loss', 'validation/main/loss',
         'main/accuracy', 'validation/main/accuracy']))

    trainer.extend(extensions.ProgressBar())
    trainer.run()

    test = MFCC.read_ceps(["test_ja_a", "test_en_a", "test_no_voice"])
    pred_list = []
    for (data, label) in test:
        pred_data = model.predictor(np.array([data]).astype(np.float32)).data
        pred_list.append((pred_data, label))
        print(pred_data, label)

if __name__ == '__main__':
    main()


MFCCの部分。そのへんに落ちているのだと、普通にライブラリが読み込めず断念

from python_speech_features import mfcc
import scipy
from scipy import io
from scipy.io import wavfile
import glob
import numpy as np
import os
from chainer.datasets import tuple_dataset


def write_ceps(ceps,fn):
    base_fn,ext = os.path.splitext(fn)
    data_fn = base_fn + ".ceps"
    np.save(data_fn,ceps)

def create_ceps(fn):
    (rate, X) = io.wavfile.read(fn)
    ceps = mfcc(X, rate)
    isNan = False
    for num in ceps:
        if np.isnan(num[1]):
            isNan = True
    if isNan == False:
        write_ceps(ceps,fn)

def read_ceps(name_list):
    X,y = [],[]
    train = []
    base_dir = os.getcwd()
    for label,name in enumerate(name_list):
        for fn in glob.glob(os.path.join(base_dir,name,"*.ceps.npy")):
            ceps = np.load(fn)
            num_ceps = len(ceps)
            X.append(np.mean(ceps[:],axis=0))
            y.append(label)
            train.append((np.float32(np.mean(ceps[:],axis=0)), np.int32(label))) #ここどう考えてもきもい
    return train

chainerのサンプルコードを理解する。

pc.atsuhiro-me.net
このサイトをコピペしたら動かしたら動いたのだが、何をしているのかを調べていきたいと思います

import json, sys, glob, datetime, math
import numpy as np
import matplotlib.pyplot as plt

import chainer
from chainer import computational_graph as c
from chainer import cuda
import chainer.functions as F
from chainer import optimizers

class RegressionFNN:
    def __init__(self, n_units=64, batchsize=100):
        self.n_units = n_units
        self.batchsize = batchsize
        self.plotcount=0

    def load(self, train_x, train_y):
        if len(train_x)!=len(train_y):
            raise ValueError
        self.N = len(train_y)
        self.I = 1
        self.x_train = np.array(train_x, np.float32).reshape(len(train_x),1)
        self.y_train = np.array(train_y, np.float32).reshape(len(train_y),)
        #
        self.model = chainer.FunctionSet(l1=F.Linear(self.I, self.n_units),
                                        l2=F.Linear(self.n_units, self.n_units),
                                        l3=F.Linear(self.n_units, self.n_units),
                                        l4=F.Linear(self.n_units, 1))
        #
        self.optimizer = optimizers.Adam()
        self.optimizer.setup(self.model.collect_parameters())


    def forward(self, x_data, y_data, train=True):
        x = chainer.Variable(x_data)
        t = chainer.Variable(y_data)
        h1 = F.relu(self.model.l1(x))
        h2 = F.relu(self.model.l2(h1))
        h3 = F.relu(self.model.l3(h2))
        y = F.reshape(self.model.l4(h3), (len(y_data), ))
        return F.mean_squared_error(y, t), y

    def calc(self, n_epoch):
        for epoch in range(n_epoch):
            perm = np.random.permutation(self.N)
            sum_loss = 0
            #
            for i in range(0, self.N, self.batchsize):
                x_batch = self.x_train[perm[i:i + self.batchsize]]
                y_batch = self.y_train[perm[i:i + self.batchsize]]
                #
                self.optimizer.zero_grads()
                loss, y = self.forward(x_batch, y_batch)
                loss.backward()
                self.optimizer.update()
                #
                sum_loss += float(loss.data) * len(y_batch)
            #
            print('epoch = {}, train mean loss={}\r'.format(epoch, sum_loss / self.N), end="")

    def getY(self, test_x, test_y):
        if len(test_x)!=len(test_y):
            raise ValueError
        x_test = np.array(test_x, np.float32).reshape(len(test_x),1)
        y_test = np.array(test_y, np.float32).reshape(len(test_y),)
        loss, y = self.forward(x_test, y_test, train=False)
        #
        with open("output/{}.csv".format(self.plotcount), "w") as f:
            f.write("\n".join(["{},{}".format(ux,uy) for ux,uy in zip(y.data, y_test)]))

        #
        plt.clf()
        plt.plot(x_test, y_test, color="b", label="original")
        plt.plot(x_test, y.data, color="g", label="RegFNN")
        plt.legend(loc = 'lower left')
        plt.ylim(-1.2,1.2)
        plt.grid(True)
        plt.savefig("output/{}.png".format(self.plotcount))
        self.plotcount+=1


def f(d):
    return [math.sin(u) for u in d]
if __name__=="__main__":
    rf = RegressionFNN(n_units=64, batchsize=314)
    d = [u*0.02 for u in range(314)]
    rf.load(d,f(d))
    rf.calc(1000) # epoch
    rf.getY(d,f(d))
    print("\ndone.")

ここの部分を見る。

        self.x_train = np.array(train_x, np.float32).reshape(len(train_x),1)
        self.y_train = np.array(train_y, np.float32).reshape(len(train_y),)

NumPy 配列の基礎 — 機械学習の Python との出会い

ここを参考にした。

arrayは配列を作れる。pythonの普通の配列と違うけど、そこは割愛する。

array(配列もしくはタプル, 型)

でざっくり言うと行列が作れる。たとえば

array([[1,2,3], [2,4,6]], float32)

とすると

[[1.0, 2.0, 3.0], 
[2.0, 4.0, 6.0]]

という具合らしい。
二重にネストしている点に注意する。
また第二引数はfloat32とかint64とかcomplex128(複素数)とかが入る。

配列の次元数や大きさの操作 — 機械学習の Python との出会い
reshapeは配列を望んだ形に変える。

reshape([1,2,3,2,4,6], (2, 3))

だと

[[1, 2, 3],
[2, 4, 6]]

となる。1×6の行列を2×3にした。積が一緒でないとエラーが出る。
また-1をしていすると、うまいぐあいに調節してくれる。
(2, -1)と書いた場合2×3行列にしてくれる。

では

np.array(train_x, np.float32).reshape(len(train_x),1)

はなにをしているのかというと、train_xの長さ×1、つまり縦ベクトルに変換していることになる。(らしい)
numpyの1d-arrayを2d-arrayに変換 - keisukeのブログ

np.array(train_y, np.float32).reshape(len(train_y),)

は、横のまま?わからぬ。

Linear(変換前の行列の次元, 変換後の行列の次元)

> Linearは全ての入力と出力がつながっているようなニューラルネットワークを表します。 Linearはニューラルネットワークの文脈では総結合層,数学の用語では線形変換,アフィン変換とよびます。 たとえば,次の例では5個のユニットから,2個のユニットへの変換を表します。
ただし、これはlinks.Linearの場合なので違うかも。